Skip to content

Commit

Permalink
Add schema inference with limit
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed May 22, 2024
1 parent a1a23e0 commit 268aab6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
8 changes: 6 additions & 2 deletions stac_geoparquet/arrow/_schema/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Any, Dict, Iterable, Sequence, Union
from typing import Any, Dict, Iterable, Optional, Sequence, Union

import pyarrow as pa

Expand Down Expand Up @@ -30,15 +30,19 @@ def update_from_json(
path: Union[str, Path, Iterable[Union[str, Path]]],
*,
chunk_size: int = 65536,
limit: Optional[int] = None,
) -> None:
"""
Update this inferred schema from one or more newline-delimited JSON STAC files.
Args:
path: One or more paths to files with STAC items.
chunk_size: The chunk size to load into memory at a time. Defaults to 65536.
Other args:
limit: The maximum number of JSON Items to use for schema inference
"""
for batch in read_json_chunked(path, chunk_size=chunk_size):
for batch in read_json_chunked(path, chunk_size=chunk_size, limit=limit):
self.update_from_items(batch)

def update_from_items(self, items: Sequence[Dict[str, Any]]) -> None:
Expand Down
6 changes: 5 additions & 1 deletion stac_geoparquet/arrow/_to_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def parse_stac_ndjson_to_arrow(
*,
chunk_size: int = 65536,
schema: Optional[pa.Schema] = None,
limit: Optional[int] = None,
) -> Iterator[pa.RecordBatch]:
"""
Convert one or more newline-delimited JSON STAC files to a generator of Arrow
Expand All @@ -78,14 +79,17 @@ def parse_stac_ndjson_to_arrow(
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data.
Other args:
limit: The maximum number of JSON Items to use for schema inference
Yields:
Arrow RecordBatch with a single chunk of Item data.
"""
# If the schema was not provided, then we need to load all data into memory at once
# to perform schema resolution.
if schema is None:
inferred_schema = InferredSchema()
inferred_schema.update_from_json(path, chunk_size=chunk_size)
inferred_schema.update_from_json(path, chunk_size=chunk_size, limit=limit)
yield from parse_stac_ndjson_to_arrow(
path, chunk_size=chunk_size, schema=inferred_schema
)
Expand Down
6 changes: 5 additions & 1 deletion stac_geoparquet/arrow/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ def update_batch_schema(


def batched_iter(
lst: Iterable[Dict[str, Any]], n: int
lst: Iterable[Dict[str, Any]], n: int, *, limit: Optional[int] = None
) -> Iterable[Sequence[Dict[str, Any]]]:
"""Yield successive n-sized chunks from iterable."""
if n < 1:
raise ValueError("n must be at least one")
it = iter(lst)
count = 0
while batch := tuple(islice(it, n)):
yield batch
count += len(batch)
if limit and count >= limit:
return


def stac_items_to_arrow(
Expand Down
9 changes: 6 additions & 3 deletions stac_geoparquet/json_reader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Return an iterator of items from an ndjson, a json array of items, or a featurecollection of items."""

from pathlib import Path
from typing import Any, Dict, Iterable, Sequence, Union
from typing import Any, Dict, Iterable, Optional, Sequence, Union

import orjson

Expand Down Expand Up @@ -39,7 +39,10 @@ def read_json(


def read_json_chunked(
path: Union[str, Path, Iterable[Union[str, Path]]], chunk_size: int
path: Union[str, Path, Iterable[Union[str, Path]]],
chunk_size: int,
*,
limit: Optional[int] = None,
) -> Iterable[Sequence[Dict[str, Any]]]:
"""Read from a JSON or NDJSON file in chunks of `chunk_size`."""
return batched_iter(read_json(path), chunk_size)
return batched_iter(read_json(path), chunk_size, limit=limit)

0 comments on commit 268aab6

Please sign in to comment.