Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
  • Loading branch information
adalpane committed Aug 26, 2024
1 parent 1514c25 commit b6a8f18
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 161 deletions.
6 changes: 5 additions & 1 deletion examples/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ fn main() {
shares[0].name
);
} else {
let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap();
let res = app
.get_dataframe(&tables[0], None)
.unwrap()
.collect()
.unwrap();
println!("Dataframe:\n {}", res);
}
}
Expand Down
197 changes: 136 additions & 61 deletions src/blocking/client.rs

Large diffs are not rendered by default.

201 changes: 136 additions & 65 deletions src/client.rs

Large diffs are not rendered by default.

26 changes: 15 additions & 11 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ pub struct DeltaProtocol {
pub min_reader_version: i32,
pub min_writer_version: i32,
pub reader_features: Vec<String>,
pub writer_features: Vec<String>
pub writer_features: Vec<String>,
}

#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)]
#[serde(untagged)]
pub enum Protocol {
Parquet {
Parquet {
#[serde(rename = "minReaderVersion")]
min_reader_version: i32
min_reader_version: i32,
},
Delta {
Delta {
#[serde(rename = "deltaProtocol")]
delta_protocol: DeltaProtocol
}
delta_protocol: DeltaProtocol,
},
}

#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)]
Expand All @@ -81,14 +81,14 @@ pub struct ParquetMetadata {
pub struct DeltaMetadata {
pub size: usize,
pub num_files: usize,
pub delta_metadata: ParquetMetadata
pub delta_metadata: ParquetMetadata,
}

#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)]
#[serde(untagged)]
pub enum Metadata {
Parquet(ParquetMetadata),
Delta(DeltaMetadata)
Delta(DeltaMetadata),
}

#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)]
Expand Down Expand Up @@ -120,7 +120,11 @@ pub struct DeltaFile {

impl DeltaFile {
pub fn get_url(&self) -> Option<String> {
if let Some(value) = self.delta_single_action.get("add").and_then(|add| add.get("path")) {
if let Some(value) = self
.delta_single_action
.get("add")
.and_then(|add| add.get("path"))
{
return value.as_str().map(|v| v.to_string());
} else {
return None;
Expand All @@ -132,7 +136,7 @@ impl DeltaFile {
#[serde(untagged)]
pub enum File {
Parquet(ParquetFile),
Delta(DeltaFile)
Delta(DeltaFile),
}

#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)]
Expand All @@ -145,4 +149,4 @@ pub struct FilesRequest {
pub predicate_hints: Option<Vec<String>>,
pub limit_hint: Option<i32>,
pub version: Option<i32>,
}
}
4 changes: 3 additions & 1 deletion src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use polars::prelude::*;
use std::path::PathBuf;

pub fn load_parquet_files_as_dataframe(parquet_root_dir_path: &PathBuf) -> Result<LazyFrame, PolarsError> {
pub fn load_parquet_files_as_dataframe(
parquet_root_dir_path: &PathBuf,
) -> Result<LazyFrame, PolarsError> {
let search_pattern = parquet_root_dir_path
.join("*.parquet")
.display()
Expand Down
2 changes: 0 additions & 2 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,3 @@ pub struct FileCache {
pub table_files: TableFiles,
pub file_paths: Vec<PathBuf>,
}


48 changes: 28 additions & 20 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,20 @@ async fn get_table_metadata() {

match meta.protocol {
Protocol::Delta { .. } => assert!(false, "Wrong protocol deserialization"),
Protocol::Parquet { min_reader_version } => assert_eq!(min_reader_version, 1, "Protocol mismatch")
Protocol::Parquet { min_reader_version } => {
assert_eq!(min_reader_version, 1, "Protocol mismatch")
}
};
match meta.metadata {
Metadata::Delta { .. } => assert!(false, "Wrong metadata deserialization"),
Metadata::Parquet ( ParquetMetadata { id, format, name, partition_columns, configuration, .. }) => {
Metadata::Parquet(ParquetMetadata {
id,
format,
name,
partition_columns,
configuration,
..
}) => {
assert_eq!(
id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8",
"Metadata ID mismatch"
Expand All @@ -127,15 +136,8 @@ async fn get_table_metadata() {
format.provider, "parquet",
"Metadata format provider mismatch"
);
assert_eq!(
name, None,
"Metadata name value should be missing"
);
assert_eq!(
partition_columns.len(),
0,
"There should be no partitions"
);
assert_eq!(name, None, "Metadata name value should be missing");
assert_eq!(partition_columns.len(), 0, "There should be no partitions");
assert_eq!(
configuration["conf_1_name"], "conf_1_value",
"Configuration value expected"
Expand Down Expand Up @@ -199,16 +201,12 @@ async fn list_all_table_files() {
table.share, table.schema, table.name
);
let app = create_mocked_test_app(body, &url, method("POST")).await;
let files = app
.client
.list_table_files(&table, None)
.await
.unwrap();
let files = app.client.list_table_files(&table, None).await.unwrap();

assert_eq!(files.files.len(), 2, "File count mismatch");
match &files.files[1] {
File::Parquet(ParquetFile { id, ..}) => assert_eq!(id, "2", "File id mismatch"),
File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization")
File::Parquet(ParquetFile { id, .. }) => assert_eq!(id, "2", "File id mismatch"),
File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization"),
};
}

Expand Down Expand Up @@ -341,11 +339,21 @@ async fn get_dataframe() {
.unwrap()
.to_string();

let df = c.get_dataframe(&table, None).await.unwrap().collect().unwrap();
let df = c
.get_dataframe(&table, None)
.await
.unwrap()
.collect()
.unwrap();
assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch");

// Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks)
let df1 = c.get_dataframe(&table, None).await.unwrap().collect().unwrap();
let df1 = c
.get_dataframe(&table, None)
.await
.unwrap()
.collect()
.unwrap();
assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch");
assert_eq!(
df1.get_row(0).unwrap().0[1],
Expand Down

0 comments on commit b6a8f18

Please sign in to comment.