Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use iterables / and record batches in to_arrow #53

Merged
merged 12 commits into from
May 22, 2024

Conversation

bitner
Copy link
Contributor

@bitner bitner commented May 15, 2024

  • User orjson rather than json for speed.
  • Allow items to be passed in as an iterable
  • Use RecordBatches for processing
    • Since we are processing on record batches, don't use chunked arrays
  • Add function for reading json that can read either ndjson or items in a json file either as a root list or FeatureCollection

… items and record batches for converting to arrow
Copy link
Collaborator

@kylebarron kylebarron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have some merge conflicts with #50 but overall this looks great 🙏

@bitner
Copy link
Contributor Author

bitner commented May 15, 2024

@kylebarron OK, merged in the changes from #50 and converted some more things to using batches. I did change up the full schema sniffing away from using the InferredSchema class. I think with using the parsers that return batches it is a bit more straight forward to just get the batches and check the schemas there, but am happy to revert back to using the class. Tests pass, but I haven't done a round to make sure there aren't any refactored functions that need to be removed or anything like that.

@bitner bitner marked this pull request as ready for review May 15, 2024 18:51
@bitner bitner requested a review from kylebarron May 15, 2024 18:51
Copy link
Collaborator

@kylebarron kylebarron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments around schemas, downcasting, and reading JSON

stac_geoparquet/arrow/_to_arrow.py Outdated Show resolved Hide resolved
stac_geoparquet/arrow/_to_arrow.py Outdated Show resolved Hide resolved
*,
chunk_size: int = 8192,
schema: Optional[Union[pa.Schema, InferredSchema]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the InferredSchema support here? I had intended InferredSchema to act the same as a pa.Schema, but with stronger typing (e.g. InferredSchema is just a newtype around a schema)

stac_geoparquet/json_reader.py Outdated Show resolved Hide resolved
@kylebarron
Copy link
Collaborator

I did change up the full schema sniffing away from using the InferredSchema class

Oh I see now you removed it entirely. I was thinking it would be useful to have some class where we can assign methods onto it. E.g. InferredSchema.to_stac_arrow_schema(), or some name like that, which would apply all the STAC-GeoParquet schema transformations, and manage the transition between the low-level schema applied to input and the Parquet schema used for saving output

@kylebarron
Copy link
Collaborator

Add a limited scan of first N JSON items to infer schema

Comment on lines 38 to 54
if schema is None:
unified_batches = []
for batch in batches:
if schema is None:
schema = batch.schema
else:
schema = pa.unify_schemas(
[schema, batch.schema], promote_options="permissive"
)
unified_batches.append(update_batch_schema(batch, schema))
batches = unified_batches

assert schema is not None
schema = schema.with_metadata(_create_geoparquet_metadata())

with pq.ParquetWriter(output_path, schema, **kwargs) as writer:
writer.write_batch(first_batch)
for batch in batches_iter:
for batch in batches:
Copy link
Collaborator

@kylebarron kylebarron May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the record this doesn't work. You can't change the schema you pass to ParquetWriter without also changing the physical schema of the arrow data (I don't know how you do that, maybe with pyarrow.compute.cast or table.cast)

import pyarrow.parquet as pq
import pyarrow as pa

table = pa.table({"a": [1, 2, 3, 4]})
assert pa.types.is_int64(table.schema.field("a"))

with pq.ParquetWriter("test.parquet", pa.schema([pa.field("a", pa.int32())])) as writer:
    writer.write_table(table)

gives


ValueError: Table schema does not match schema used to create file: 
table:
a: int64 vs. 
file:
a: int32

Separately, this code was already exhausting the iterator, so it wouldn't have had batches to actually write to parquet

@kylebarron
Copy link
Collaborator

Add a limited scan of first N JSON items to infer schema

The last commit added a limit parameter to parse_stac_ndjson_to_arrow

@kylebarron kylebarron merged commit 22abad6 into stac-utils:main May 22, 2024
1 check passed
@kylebarron kylebarron changed the title User iterables / and record batches in to_arrow Use iterables / and record batches in to_arrow Jun 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants