From 5bca2cb587fbee3c570823ddeebf78f4a771030e Mon Sep 17 00:00:00 2001 From: David Bitner Date: Wed, 15 May 2024 10:44:37 -0500 Subject: [PATCH 01/11] add json reader that can accomodate json and ndjson, user iterable of items and record batches for converting to arrow --- pyproject.toml | 1 + stac_geoparquet/arrow/_to_arrow.py | 330 +++++++++++++++-------------- stac_geoparquet/json_reader.py | 22 ++ 3 files changed, 190 insertions(+), 163 deletions(-) create mode 100644 stac_geoparquet/json_reader.py diff --git a/pyproject.toml b/pyproject.toml index 0003844..1d74323 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "pyproj", "pystac", "shapely", + "orjson", ] [tool.hatch.version] diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 5688e31..d76c9a6 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -1,37 +1,41 @@ """Convert STAC data into Arrow tables""" -import json from copy import deepcopy -from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Union, Generator +from typing import Any, Dict, Optional, Sequence, Union, Iterable import ciso8601 +from itertools import islice import numpy as np import pyarrow as pa import pyarrow.compute as pc import shapely import shapely.geometry +import orjson from stac_geoparquet.arrow._to_parquet import WGS84_CRS_JSON +from stac_geoparquet.json_reader import read_json def _chunks( - lst: Sequence[Dict[str, Any]], n: int -) -> Generator[Sequence[Dict[str, Any]], None, None]: - """Yield successive n-sized chunks from lst.""" - for i in range(0, len(lst), n): - yield lst[i : i + n] - - -def parse_stac_items_to_arrow( - items: Sequence[Dict[str, Any]], + lst: Iterable[Dict[str, Any]], n: int +) -> Iterable[Sequence[Dict[str, Any]]]: + """Yield successive n-sized chunks from iterable.""" + if n < 1: + raise ValueError("n must be at least one") + it = iter(lst) + while batch := tuple(islice(it, n)): + yield batch + + +def parse_stac_items_to_batches( + items: Iterable[Dict[str, Any]], *, chunk_size: int = 8192, schema: Optional[pa.Schema] = None, downcast: bool = True, -) -> pa.Table: - """Parse a collection of STAC Items to a :class:`pyarrow.Table`. +) -> Iterable[pa.RecordBatch]: + """Parse a collection of STAC Items to an iterable of :class:`pyarrow.RecordBatch`. The objects under `properties` are moved up to the top-level of the Table, similar to :meth:`geopandas.GeoDataFrame.from_features`. @@ -47,67 +51,59 @@ def parse_stac_items_to_arrow( downcast: if True, store bbox as float32 for memory and disk saving. Returns: - a pyarrow Table with the STAC-GeoParquet representation of items. + an iterable of pyarrow RecordBatches with the STAC-GeoParquet representation of items. """ if schema is not None: # If schema is provided, then for better memory usage we parse input STAC items # to Arrow batches in chunks. - batches = [] for chunk in _chunks(items, chunk_size): - batches.append(_stac_items_to_arrow(chunk, schema=schema)) - - table = pa.Table.from_batches(batches, schema=schema) + yield _stac_items_to_arrow(chunk, schema=schema, downcast=downcast) else: - # If schema is _not_ provided, then we must convert to Arrow all at once, or - # else it would be possible for a STAC item late in the collection (after the - # first chunk) to have a different schema and not match the schema inferred for - # the first chunk. - table = pa.Table.from_batches([_stac_items_to_arrow(items)]) - - return _process_arrow_table(table, downcast=downcast) + yield _stac_items_to_arrow(items, downcast=downcast) -def parse_stac_ndjson_to_arrow( - path: Union[str, Path], +def parse_stac_items_to_arrow( + items: Iterable[Dict[str, Any]], *, chunk_size: int = 8192, schema: Optional[pa.Schema] = None, downcast: bool = True, ) -> pa.Table: - # Define outside of if/else to make mypy happy - items: List[dict] = [] - - # If the schema was not provided, then we need to load all data into memory at once - # to perform schema resolution. - if schema is None: - with open(path) as f: - for line in f: - items.append(json.loads(line)) - - return parse_stac_items_to_arrow(items, chunk_size=chunk_size, schema=schema) + batches = parse_stac_items_to_batches( + items, chunk_size=chunk_size, schema=schema, downcast=downcast + ) + return pa.Table.from_batches(batches, schema=schema) - # Otherwise, we can stream over the input, converting each batch of `chunk_size` - # into an Arrow RecordBatch at a time. This is much more memory efficient. - with open(path) as f: - batches: List[pa.RecordBatch] = [] - for line in f: - items.append(json.loads(line)) - if len(items) >= chunk_size: - batches.append(_stac_items_to_arrow(items, schema=schema)) - items = [] +def parse_stac_ndjson_to_batches( + path: Union[str, Path], + *, + chunk_size: int = 8192, + schema: Optional[pa.Schema] = None, + downcast: bool = True, +) -> Iterable[pa.RecordBatch]: + return parse_stac_items_to_batches( + read_json(path), chunk_size=chunk_size, schema=schema, downcast=downcast + ) - # Don't forget the last chunk in case the total number of items is not a multiple of - # chunk_size. - if len(items) > 0: - batches.append(_stac_items_to_arrow(items, schema=schema)) - table = pa.Table.from_batches(batches, schema=schema) - return _process_arrow_table(table, downcast=downcast) +def parse_stac_ndjson_to_arrow( + path: Union[str, Path], + *, + chunk_size: int = 8192, + schema: Optional[pa.Schema] = None, + downcast: bool = True, +) -> pa.Table: + batches = parse_stac_items_to_batches( + read_json(path), chunk_size=chunk_size, schema=schema, downcast=downcast + ) + return pa.Table.from_batches(batches, schema=schema) -def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: +def _process_arrow_table( + table: Union[pa.Table, pa.RecordBatch], *, downcast: bool = True +) -> Union[pa.Table, pa.RecordBatch]: table = _bring_properties_to_top_level(table) table = _convert_timestamp_columns(table) table = _convert_bbox_to_struct(table, downcast=downcast) @@ -116,7 +112,10 @@ def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: def _stac_items_to_arrow( - items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None + items: Iterable[Dict[str, Any]], + *, + schema: Optional[pa.Schema] = None, + downcast: bool = True, ) -> pa.RecordBatch: """Convert dicts representing STAC Items to Arrow @@ -153,10 +152,14 @@ def _stac_items_to_arrow( array = pa.array(wkb_items, type=pa.struct(schema)) else: array = pa.array(wkb_items) - return pa.RecordBatch.from_struct_array(array) + return _process_arrow_table( + pa.RecordBatch.from_struct_array(array), downcast=downcast + ) -def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: +def _bring_properties_to_top_level( + table: Union[pa.Table, pa.RecordBatch], +) -> Union[pa.Table, pa.RecordBatch]: """Bring all the fields inside of the nested "properties" struct to the top level""" properties_field = table.schema.field("properties") properties_column = table["properties"] @@ -167,20 +170,32 @@ def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: inner_prop_field, pc.struct_field(properties_column, field_idx) ) - table = table.drop("properties") + table = table.drop_columns( + [ + "properties", + ] + ) return table -def _convert_geometry_to_wkb(table: pa.Table) -> pa.Table: +def _convert_geometry_to_wkb( + table: Union[pa.Table, pa.RecordBatch], +) -> Union[pa.Table, pa.RecordBatch]: """Convert the geometry column in the table to WKB""" geoms = shapely.from_geojson( - [json.dumps(item) for item in table["geometry"].to_pylist()] + [orjson.dumps(item) for item in table["geometry"].to_pylist()] ) wkb_geoms = shapely.to_wkb(geoms) - return table.drop("geometry").append_column("geometry", pa.array(wkb_geoms)) + return table.drop_columns( + [ + "geometry", + ] + ).append_column("geometry", pa.array(wkb_geoms)) -def _convert_timestamp_columns(table: pa.Table) -> pa.Table: +def _convert_timestamp_columns( + table: Union[pa.Table, pa.RecordBatch], +) -> Union[pa.Table, pa.RecordBatch]: """Convert all timestamp columns from a string to an Arrow Timestamp data type""" allowed_column_names = { "datetime", # common metadata @@ -223,29 +238,18 @@ def _convert_timestamp_columns(table: pa.Table) -> pa.Table: return table -def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.ChunkedArray: +def _convert_timestamp_column(column: pa.Array) -> pa.TimestampArray: """Convert an individual timestamp column from string to a Timestamp type""" - chunks = [] - for chunk in column.chunks: - parsed_chunk: List[Optional[datetime]] = [] - for item in chunk: - if not item.is_valid: - parsed_chunk.append(None) - else: - parsed_chunk.append(ciso8601.parse_rfc3339(item.as_py())) - - pyarrow_chunk = pa.array(parsed_chunk) - chunks.append(pyarrow_chunk) - - return pa.chunked_array(chunks) + return pa.array( + [ciso8601.parse_rfc3339(str(t)) for t in column], pa.timestamp("us", tz="UTC") + ) -def _is_bbox_3d(bbox_col: pa.ChunkedArray) -> bool: +def _is_bbox_3d(bbox_col: pa.Array) -> bool: """Infer whether the bounding box column represents 2d or 3d bounding boxes.""" offsets_set = set() - for chunk in bbox_col.chunks: - offsets = chunk.offsets.to_numpy() - offsets_set.update(np.unique(offsets[1:] - offsets[:-1])) + offsets = bbox_col.offsets.to_numpy() + offsets_set.update(np.unique(offsets[1:] - offsets[:-1])) if len(offsets_set) > 1: raise ValueError("Mixed 2d-3d bounding boxes not yet supported") @@ -259,7 +263,9 @@ def _is_bbox_3d(bbox_col: pa.ChunkedArray) -> bool: raise ValueError(f"Unexpected bbox offset: {offset=}") -def _convert_bbox_to_struct(table: pa.Table, *, downcast: bool) -> pa.Table: +def _convert_bbox_to_struct( + table: Union[pa.Table, pa.RecordBatch], *, downcast: bool +) -> Union[pa.Table, pa.RecordBatch]: """Convert bbox column to a struct representation Since the bbox in JSON is stored as an array, pyarrow automatically converts the @@ -281,100 +287,98 @@ def _convert_bbox_to_struct(table: pa.Table, *, downcast: bool) -> pa.Table: bbox_col = table.column(bbox_col_idx) bbox_3d = _is_bbox_3d(bbox_col) - new_chunks = [] - for chunk in bbox_col.chunks: - assert ( - pa.types.is_list(chunk.type) - or pa.types.is_large_list(chunk.type) - or pa.types.is_fixed_size_list(chunk.type) - ) - if bbox_3d: - coords = chunk.flatten().to_numpy().reshape(-1, 6) - else: - coords = chunk.flatten().to_numpy().reshape(-1, 4) + assert ( + pa.types.is_list(bbox_col.type) + or pa.types.is_large_list(bbox_col.type) + or pa.types.is_fixed_size_list(bbox_col.type) + ) + if bbox_3d: + coords = bbox_col.flatten().to_numpy().reshape(-1, 6) + else: + coords = bbox_col.flatten().to_numpy().reshape(-1, 4) + + if downcast: + coords = coords.astype(np.float32) + + if bbox_3d: + xmin = coords[:, 0] + ymin = coords[:, 1] + zmin = coords[:, 2] + xmax = coords[:, 3] + ymax = coords[:, 4] + zmax = coords[:, 5] if downcast: - coords = coords.astype(np.float32) - - if bbox_3d: - xmin = coords[:, 0] - ymin = coords[:, 1] - zmin = coords[:, 2] - xmax = coords[:, 3] - ymax = coords[:, 4] - zmax = coords[:, 5] - - if downcast: - # Round min values down to the next float32 value - # Round max values up to the next float32 value - xmin = np.nextafter(xmin, -np.Infinity) - ymin = np.nextafter(ymin, -np.Infinity) - zmin = np.nextafter(zmin, -np.Infinity) - xmax = np.nextafter(xmax, np.Infinity) - ymax = np.nextafter(ymax, np.Infinity) - zmax = np.nextafter(zmax, np.Infinity) - - struct_arr = pa.StructArray.from_arrays( - [ - xmin, - ymin, - zmin, - xmax, - ymax, - zmax, - ], - names=[ - "xmin", - "ymin", - "zmin", - "xmax", - "ymax", - "zmax", - ], - ) + # Round min values down to the next float32 value + # Round max values up to the next float32 value + xmin = np.nextafter(xmin, -np.Infinity) + ymin = np.nextafter(ymin, -np.Infinity) + zmin = np.nextafter(zmin, -np.Infinity) + xmax = np.nextafter(xmax, np.Infinity) + ymax = np.nextafter(ymax, np.Infinity) + zmax = np.nextafter(zmax, np.Infinity) + + struct_arr = pa.StructArray.from_arrays( + [ + xmin, + ymin, + zmin, + xmax, + ymax, + zmax, + ], + names=[ + "xmin", + "ymin", + "zmin", + "xmax", + "ymax", + "zmax", + ], + ) - else: - xmin = coords[:, 0] - ymin = coords[:, 1] - xmax = coords[:, 2] - ymax = coords[:, 3] - - if downcast: - # Round min values down to the next float32 value - # Round max values up to the next float32 value - xmin = np.nextafter(xmin, -np.Infinity) - ymin = np.nextafter(ymin, -np.Infinity) - xmax = np.nextafter(xmax, np.Infinity) - ymax = np.nextafter(ymax, np.Infinity) - - struct_arr = pa.StructArray.from_arrays( - [ - xmin, - ymin, - xmax, - ymax, - ], - names=[ - "xmin", - "ymin", - "xmax", - "ymax", - ], - ) + else: + xmin = coords[:, 0] + ymin = coords[:, 1] + xmax = coords[:, 2] + ymax = coords[:, 3] - new_chunks.append(struct_arr) + if downcast: + # Round min values down to the next float32 value + # Round max values up to the next float32 value + xmin = np.nextafter(xmin, -np.Infinity) + ymin = np.nextafter(ymin, -np.Infinity) + xmax = np.nextafter(xmax, np.Infinity) + ymax = np.nextafter(ymax, np.Infinity) + + struct_arr = pa.StructArray.from_arrays( + [ + xmin, + ymin, + xmax, + ymax, + ], + names=[ + "xmin", + "ymin", + "xmax", + "ymax", + ], + ) - return table.set_column(bbox_col_idx, "bbox", new_chunks) + return table.set_column(bbox_col_idx, "bbox", struct_arr) -def _assign_geoarrow_metadata(table: pa.Table) -> pa.Table: +def _assign_geoarrow_metadata( + table: Union[pa.Table, pa.RecordBatch], +) -> Union[pa.Table, pa.RecordBatch]: """Tag the primary geometry column with `geoarrow.wkb` on the field metadata.""" existing_field_idx = table.schema.get_field_index("geometry") existing_field = table.schema.field(existing_field_idx) ext_metadata = {"crs": WGS84_CRS_JSON} field_metadata = { b"ARROW:extension:name": b"geoarrow.wkb", - b"ARROW:extension:metadata": json.dumps(ext_metadata).encode("utf-8"), + b"ARROW:extension:metadata": orjson.dumps(ext_metadata), } new_field = existing_field.with_metadata(field_metadata) return table.set_column( diff --git a/stac_geoparquet/json_reader.py b/stac_geoparquet/json_reader.py new file mode 100644 index 0000000..8bacce6 --- /dev/null +++ b/stac_geoparquet/json_reader.py @@ -0,0 +1,22 @@ +"""Return an iterator of items from an ndjson, a json array of items, or a featurecollection of items.""" + +import orjson +from typing import Iterator, Dict, Any, Union +from pathlib import Path + + +def read_json(path: Union[str, Path]) -> Iterator[Dict[str, Any]]: + """Read a json or ndjson file.""" + with open(path) as f: + try: + # read ndjson + for line in f: + yield orjson.loads(line.strip()) + except orjson.JSONDecodeError: + f.seek(0) + # read full json file as either a list or FeatureCollection + json = orjson.loads(f.read()) + if isinstance(json, list): + yield from json + else: + yield from json["features"] From 6b26022ae40fcc83ec5a4895d4565343fb6a370f Mon Sep 17 00:00:00 2001 From: David Bitner Date: Wed, 15 May 2024 11:10:42 -0500 Subject: [PATCH 02/11] only use batch, not union --- stac_geoparquet/arrow/_to_arrow.py | 76 +++++++++++++++--------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index d76c9a6..9249a05 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -101,14 +101,14 @@ def parse_stac_ndjson_to_arrow( return pa.Table.from_batches(batches, schema=schema) -def _process_arrow_table( - table: Union[pa.Table, pa.RecordBatch], *, downcast: bool = True -) -> Union[pa.Table, pa.RecordBatch]: - table = _bring_properties_to_top_level(table) - table = _convert_timestamp_columns(table) - table = _convert_bbox_to_struct(table, downcast=downcast) - table = _assign_geoarrow_metadata(table) - return table +def _process_arrow_batch( + batch: pa.RecordBatch, *, downcast: bool = True +) -> pa.RecordBatch: + batch = _bring_properties_to_top_level(batch) + batch = _convert_timestamp_columns(batch) + batch = _convert_bbox_to_struct(batch, downcast=downcast) + batch = _assign_geoarrow_metadata(batch) + return batch def _stac_items_to_arrow( @@ -152,41 +152,41 @@ def _stac_items_to_arrow( array = pa.array(wkb_items, type=pa.struct(schema)) else: array = pa.array(wkb_items) - return _process_arrow_table( + return _process_arrow_batch( pa.RecordBatch.from_struct_array(array), downcast=downcast ) def _bring_properties_to_top_level( - table: Union[pa.Table, pa.RecordBatch], -) -> Union[pa.Table, pa.RecordBatch]: + batch: pa.RecordBatch, +) -> pa.RecordBatch: """Bring all the fields inside of the nested "properties" struct to the top level""" - properties_field = table.schema.field("properties") - properties_column = table["properties"] + properties_field = batch.schema.field("properties") + properties_column = batch["properties"] for field_idx in range(properties_field.type.num_fields): inner_prop_field = properties_field.type.field(field_idx) - table = table.append_column( + batch = batch.append_column( inner_prop_field, pc.struct_field(properties_column, field_idx) ) - table = table.drop_columns( + batch = batch.drop_columns( [ "properties", ] ) - return table + return batch def _convert_geometry_to_wkb( - table: Union[pa.Table, pa.RecordBatch], -) -> Union[pa.Table, pa.RecordBatch]: + batch: pa.RecordBatch, +) -> pa.RecordBatch: """Convert the geometry column in the table to WKB""" geoms = shapely.from_geojson( - [orjson.dumps(item) for item in table["geometry"].to_pylist()] + [orjson.dumps(item) for item in batch["geometry"].to_pylist()] ) wkb_geoms = shapely.to_wkb(geoms) - return table.drop_columns( + return batch.drop_columns( [ "geometry", ] @@ -194,8 +194,8 @@ def _convert_geometry_to_wkb( def _convert_timestamp_columns( - table: Union[pa.Table, pa.RecordBatch], -) -> Union[pa.Table, pa.RecordBatch]: + batch: pa.RecordBatch, +) -> pa.RecordBatch: """Convert all timestamp columns from a string to an Arrow Timestamp data type""" allowed_column_names = { "datetime", # common metadata @@ -209,11 +209,11 @@ def _convert_timestamp_columns( } for column_name in allowed_column_names: try: - column = table[column_name] + column = batch[column_name] except KeyError: continue - field_index = table.schema.get_field_index(column_name) + field_index = batch.schema.get_field_index(column_name) if pa.types.is_timestamp(column.type): continue @@ -221,12 +221,12 @@ def _convert_timestamp_columns( # STAC allows datetimes to be null. If all rows are null, the column type may be # inferred as null. We cast this to a timestamp column. elif pa.types.is_null(column.type): - table = table.set_column( + batch = batch.set_column( field_index, column_name, column.cast(pa.timestamp("us")) ) elif pa.types.is_string(column.type): - table = table.set_column( + batch = batch.set_column( field_index, column_name, _convert_timestamp_column(column) ) else: @@ -235,7 +235,7 @@ def _convert_timestamp_columns( f" timestamp data type but got {column.type}" ) - return table + return batch def _convert_timestamp_column(column: pa.Array) -> pa.TimestampArray: @@ -264,8 +264,8 @@ def _is_bbox_3d(bbox_col: pa.Array) -> bool: def _convert_bbox_to_struct( - table: Union[pa.Table, pa.RecordBatch], *, downcast: bool -) -> Union[pa.Table, pa.RecordBatch]: + batch: pa.RecordBatch, downcast: bool = True +) -> pa.RecordBatch: """Convert bbox column to a struct representation Since the bbox in JSON is stored as an array, pyarrow automatically converts the @@ -283,8 +283,8 @@ def _convert_bbox_to_struct( Returns: New table """ - bbox_col_idx = table.schema.get_field_index("bbox") - bbox_col = table.column(bbox_col_idx) + bbox_col_idx = batch.schema.get_field_index("bbox") + bbox_col = batch.column(bbox_col_idx) bbox_3d = _is_bbox_3d(bbox_col) assert ( @@ -366,21 +366,21 @@ def _convert_bbox_to_struct( ], ) - return table.set_column(bbox_col_idx, "bbox", struct_arr) + return batch.set_column(bbox_col_idx, "bbox", struct_arr) def _assign_geoarrow_metadata( - table: Union[pa.Table, pa.RecordBatch], -) -> Union[pa.Table, pa.RecordBatch]: + batch: pa.RecordBatch, +) -> pa.RecordBatch: """Tag the primary geometry column with `geoarrow.wkb` on the field metadata.""" - existing_field_idx = table.schema.get_field_index("geometry") - existing_field = table.schema.field(existing_field_idx) + existing_field_idx = batch.schema.get_field_index("geometry") + existing_field = batch.schema.field(existing_field_idx) ext_metadata = {"crs": WGS84_CRS_JSON} field_metadata = { b"ARROW:extension:name": b"geoarrow.wkb", b"ARROW:extension:metadata": orjson.dumps(ext_metadata), } new_field = existing_field.with_metadata(field_metadata) - return table.set_column( - existing_field_idx, new_field, table.column(existing_field_idx) + return batch.set_column( + existing_field_idx, new_field, batch.column(existing_field_idx) ) From 541ad1bd3267a434388654c309d6095256f8f647 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 13:46:15 -0400 Subject: [PATCH 03/11] remove downcast parameter --- stac_geoparquet/arrow/_to_arrow.py | 15 +++------- stac_geoparquet/arrow/_to_parquet.py | 3 +- stac_geoparquet/arrow/_util.py | 44 ++++------------------------ tests/test_arrow.py | 8 +---- 4 files changed, 12 insertions(+), 58 deletions(-) diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 7cdf7b0..196d47c 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -25,7 +25,6 @@ def parse_stac_items_to_batches( *, chunk_size: int = 8192, schema: Optional[pa.Schema] = None, - downcast: bool = True, ) -> Iterable[pa.RecordBatch]: """Parse a collection of STAC Items to an iterable of :class:`pyarrow.RecordBatch`. @@ -40,13 +39,12 @@ def parse_stac_items_to_batches( schema: The schema of the input data. If provided, can improve memory use; otherwise all items need to be parsed into a single array for schema inference. Defaults to None. - downcast: if True, store bbox as float32 for memory and disk saving. Returns: an iterable of pyarrow RecordBatches with the STAC-GeoParquet representation of items. """ for item_batch in batched_iter(items, chunk_size): - yield stac_items_to_arrow(item_batch, downcast=downcast) + yield stac_items_to_arrow(item_batch) def parse_stac_items_to_arrow( @@ -54,11 +52,8 @@ def parse_stac_items_to_arrow( *, chunk_size: int = 8192, schema: Optional[pa.Schema] = None, - downcast: bool = True, ) -> pa.Table: - batches = parse_stac_items_to_batches( - items, chunk_size=chunk_size, schema=schema, downcast=downcast - ) + batches = parse_stac_items_to_batches(items, chunk_size=chunk_size, schema=schema) if schema is not None: return pa.Table.from_batches(batches, schema=schema) @@ -79,10 +74,9 @@ def parse_stac_ndjson_to_batches( *, chunk_size: int = 8192, schema: Optional[pa.Schema] = None, - downcast: bool = True, ) -> Iterable[pa.RecordBatch]: return parse_stac_items_to_batches( - read_json(path), chunk_size=chunk_size, schema=schema, downcast=downcast + read_json(path), chunk_size=chunk_size, schema=schema ) @@ -91,7 +85,6 @@ def parse_stac_ndjson_to_arrow( *, chunk_size: int = 65536, schema: Optional[pa.Schema] = None, - downcast: bool = True, ) -> Iterator[pa.RecordBatch]: """ Convert one or more newline-delimited JSON STAC files to a generator of Arrow @@ -112,5 +105,5 @@ def parse_stac_ndjson_to_arrow( Arrow RecordBatch with a single chunk of Item data. """ return parse_stac_items_to_arrow( - read_json(path), chunk_size=chunk_size, schema=schema, downcast=downcast + read_json(path), chunk_size=chunk_size, schema=schema ) diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 7cc81c0..bd66916 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -17,7 +17,6 @@ def parse_stac_ndjson_to_parquet( *, chunk_size: int = 65536, schema: Optional[pa.Schema] = None, - downcast: bool = True, **kwargs: Any, ) -> None: """Convert one or more newline-delimited JSON STAC files to GeoParquet @@ -33,7 +32,7 @@ def parse_stac_ndjson_to_parquet( iteratively convert to GeoParquet. """ batches = parse_stac_items_to_batches( - read_json(input_path), chunk_size=chunk_size, schema=schema, downcast=downcast + read_json(input_path), chunk_size=chunk_size, schema=schema ) if schema is None: unified_batches = [] diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py index 4929149..a36a655 100644 --- a/stac_geoparquet/arrow/_util.py +++ b/stac_geoparquet/arrow/_util.py @@ -42,7 +42,6 @@ def stac_items_to_arrow( items: Iterable[Dict[str, Any]], *, schema: Optional[pa.Schema] = None, - downcast: bool = True, ) -> pa.RecordBatch: """Convert dicts representing STAC Items to Arrow @@ -94,9 +93,7 @@ def stac_items_to_arrow( array = pa.array(wkb_items, type=pa.struct(schema)) else: array = pa.array(wkb_items) - return _process_arrow_batch( - pa.RecordBatch.from_struct_array(array), downcast=downcast - ) + return _process_arrow_batch(pa.RecordBatch.from_struct_array(array)) def _bring_properties_to_top_level( @@ -205,9 +202,7 @@ def _is_bbox_3d(bbox_col: pa.Array) -> bool: raise ValueError(f"Unexpected bbox offset: {offset=}") -def _convert_bbox_to_struct( - batch: pa.RecordBatch, downcast: bool = True -) -> pa.RecordBatch: +def _convert_bbox_to_struct(batch: pa.RecordBatch) -> pa.RecordBatch: """Convert bbox column to a struct representation Since the bbox in JSON is stored as an array, pyarrow automatically converts the @@ -216,14 +211,10 @@ def _convert_bbox_to_struct( partitioning in the dataset. Args: - table: _description_ - downcast: if True, will use float32 coordinates for the bounding boxes instead - of float64. Float rounding is applied to ensure the float32 bounding box - strictly contains the original float64 box. This is recommended when - possible to minimize file size. + batch: _description_ Returns: - New table + New record batch """ bbox_col_idx = batch.schema.get_field_index("bbox") bbox_col = batch.column(bbox_col_idx) @@ -239,9 +230,6 @@ def _convert_bbox_to_struct( else: coords = bbox_col.flatten().to_numpy().reshape(-1, 4) - if downcast: - coords = coords.astype(np.float32) - if bbox_3d: xmin = coords[:, 0] ymin = coords[:, 1] @@ -250,16 +238,6 @@ def _convert_bbox_to_struct( ymax = coords[:, 4] zmax = coords[:, 5] - if downcast: - # Round min values down to the next float32 value - # Round max values up to the next float32 value - xmin = np.nextafter(xmin, -np.Infinity) - ymin = np.nextafter(ymin, -np.Infinity) - zmin = np.nextafter(zmin, -np.Infinity) - xmax = np.nextafter(xmax, np.Infinity) - ymax = np.nextafter(ymax, np.Infinity) - zmax = np.nextafter(zmax, np.Infinity) - struct_arr = pa.StructArray.from_arrays( [ xmin, @@ -285,14 +263,6 @@ def _convert_bbox_to_struct( xmax = coords[:, 2] ymax = coords[:, 3] - if downcast: - # Round min values down to the next float32 value - # Round max values up to the next float32 value - xmin = np.nextafter(xmin, -np.Infinity) - ymin = np.nextafter(ymin, -np.Infinity) - xmax = np.nextafter(xmax, np.Infinity) - ymax = np.nextafter(ymax, np.Infinity) - struct_arr = pa.StructArray.from_arrays( [ xmin, @@ -328,11 +298,9 @@ def _assign_geoarrow_metadata( ) -def _process_arrow_batch( - batch: pa.RecordBatch, *, downcast: bool = True -) -> pa.RecordBatch: +def _process_arrow_batch(batch: pa.RecordBatch) -> pa.RecordBatch: batch = _bring_properties_to_top_level(batch) batch = _convert_timestamp_columns(batch) - batch = _convert_bbox_to_struct(batch, downcast=downcast) + batch = _convert_bbox_to_struct(batch) batch = _assign_geoarrow_metadata(batch) return batch diff --git a/tests/test_arrow.py b/tests/test_arrow.py index e3f1291..4b201f6 100644 --- a/tests/test_arrow.py +++ b/tests/test_arrow.py @@ -197,13 +197,7 @@ def test_round_trip(collection_id: str): with open(HERE / "data" / f"{collection_id}-pc.json") as f: items = json.load(f) - table = parse_stac_items_to_arrow(items, downcast=True) - items_result = list(stac_table_to_items(table)) - - for result, expected in zip(items_result, items): - assert_json_value_equal(result, expected, precision=0.001) - - table = parse_stac_items_to_arrow(items, downcast=False) + table = parse_stac_items_to_arrow(items) items_result = list(stac_table_to_items(table)) for result, expected in zip(items_result, items): From c935973db483d05815efd6f204abfbf0d7106e79 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 13:48:30 -0400 Subject: [PATCH 04/11] pass along schema --- stac_geoparquet/arrow/_to_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 196d47c..174abfb 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -44,7 +44,7 @@ def parse_stac_items_to_batches( an iterable of pyarrow RecordBatches with the STAC-GeoParquet representation of items. """ for item_batch in batched_iter(items, chunk_size): - yield stac_items_to_arrow(item_batch) + yield stac_items_to_arrow(item_batch, schema=schema) def parse_stac_items_to_arrow( From 27030cefa5f46765d1ca6c5ad32d75878606dc74 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 13:54:43 -0400 Subject: [PATCH 05/11] Fix reading json --- stac_geoparquet/json_reader.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/stac_geoparquet/json_reader.py b/stac_geoparquet/json_reader.py index 8c12006..2d7f003 100644 --- a/stac_geoparquet/json_reader.py +++ b/stac_geoparquet/json_reader.py @@ -6,7 +6,7 @@ def read_json( - path: Union[str, Path, Iterable[Union[str, Path]]] + path: Union[str, Path, Iterable[Union[str, Path]]], ) -> Iterator[Dict[str, Any]]: """Read a json or ndjson file.""" if isinstance(path, (str, Path)): @@ -15,9 +15,16 @@ def read_json( for p in path: with open(p) as f: try: - # read ndjson + # Support ndjson or json list/FeatureCollection without any whitespace + # (all on first line) for line in f: - yield orjson.loads(line.strip()) + item = orjson.loads(line.strip()) + if isinstance(item, list): + yield from item + elif "features" in item: + yield from item["features"] + else: + yield item except orjson.JSONDecodeError: f.seek(0) # read full json file as either a list or FeatureCollection From 4cb16da046b442cd16179c8d218e3061cf37ca00 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 14:40:31 -0400 Subject: [PATCH 06/11] restore inferredSchema class --- stac_geoparquet/arrow/_schema/models.py | 60 +++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 stac_geoparquet/arrow/_schema/models.py diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py new file mode 100644 index 0000000..5101d28 --- /dev/null +++ b/stac_geoparquet/arrow/_schema/models.py @@ -0,0 +1,60 @@ +from pathlib import Path +from typing import Any, Dict, Iterable, Sequence, Union + +import pyarrow as pa + +from stac_geoparquet.arrow._util import stac_items_to_arrow +from stac_geoparquet.json_reader import read_json + + +class InferredSchema: + """ + A schema representing the original STAC JSON with absolutely minimal modifications. + + The only modification from the data is converting any geometry fields from GeoJSON + to WKB. + """ + + inner: pa.Schema + """The underlying Arrow schema.""" + + count: int + """The total number of items scanned.""" + + def __init__(self) -> None: + self.inner = pa.schema([]) + self.count = 0 + + def update_from_json( + self, + path: Union[str, Path, Iterable[Union[str, Path]]], + *, + chunk_size: int = 65536, + ) -> None: + """ + Update this inferred schema from one or more newline-delimited JSON STAC files. + + Args: + path: One or more paths to files with STAC items. + chunk_size: The chunk size to load into memory at a time. Defaults to 65536. + """ + items = [] + for item in read_json(path): + items.append(item) + + if len(items) >= chunk_size: + self.update_from_items(items) + items = [] + + # Handle remainder + if len(items) > 0: + self.update_from_items(items) + + def update_from_items(self, items: Sequence[Dict[str, Any]]) -> None: + """Update this inferred schema from a sequence of STAC Items.""" + self.count += len(items) + current_schema = stac_items_to_arrow(items, schema=None).schema + new_schema = pa.unify_schemas( + [self.inner, current_schema], promote_options="permissive" + ) + self.inner = new_schema From 18d23ad8cd763bf9ac9425ca42c6e75f0b658004 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 15:06:56 -0400 Subject: [PATCH 07/11] simplify chunked json reading --- stac_geoparquet/arrow/_schema/models.py | 15 +++------------ stac_geoparquet/arrow/_to_parquet.py | 5 +++-- stac_geoparquet/arrow/_util.py | 5 ++--- stac_geoparquet/json_reader.py | 16 +++++++++++++--- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py index 5101d28..191f67d 100644 --- a/stac_geoparquet/arrow/_schema/models.py +++ b/stac_geoparquet/arrow/_schema/models.py @@ -4,7 +4,7 @@ import pyarrow as pa from stac_geoparquet.arrow._util import stac_items_to_arrow -from stac_geoparquet.json_reader import read_json +from stac_geoparquet.json_reader import read_json_chunked class InferredSchema: @@ -38,17 +38,8 @@ def update_from_json( path: One or more paths to files with STAC items. chunk_size: The chunk size to load into memory at a time. Defaults to 65536. """ - items = [] - for item in read_json(path): - items.append(item) - - if len(items) >= chunk_size: - self.update_from_items(items) - items = [] - - # Handle remainder - if len(items) > 0: - self.update_from_items(items) + for batch in read_json_chunked(path, chunk_size=chunk_size): + self.update_from_items(batch) def update_from_items(self, items: Sequence[Dict[str, Any]]) -> None: """Update this inferred schema from a sequence of STAC Items.""" diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index bd66916..3e0bcbe 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -5,6 +5,7 @@ import pyarrow as pa import pyarrow.parquet as pq +from stac_geoparquet.arrow._schema.models import InferredSchema from stac_geoparquet.arrow._to_arrow import parse_stac_items_to_batches from stac_geoparquet.json_reader import read_json from stac_geoparquet.arrow._util import update_batch_schema @@ -12,11 +13,11 @@ def parse_stac_ndjson_to_parquet( - input_path: Union[Union[str, Path], Iterable[Union[str, Path]]], + input_path: Union[str, Path, Iterable[Union[str, Path]]], output_path: Union[str, Path], *, chunk_size: int = 65536, - schema: Optional[pa.Schema] = None, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, **kwargs: Any, ) -> None: """Convert one or more newline-delimited JSON STAC files to GeoParquet diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py index a36a655..6fbacc0 100644 --- a/stac_geoparquet/arrow/_util.py +++ b/stac_geoparquet/arrow/_util.py @@ -39,9 +39,7 @@ def batched_iter( def stac_items_to_arrow( - items: Iterable[Dict[str, Any]], - *, - schema: Optional[pa.Schema] = None, + items: Iterable[Dict[str, Any]], *, schema: Optional[pa.Schema] = None ) -> pa.RecordBatch: """Convert dicts representing STAC Items to Arrow @@ -93,6 +91,7 @@ def stac_items_to_arrow( array = pa.array(wkb_items, type=pa.struct(schema)) else: array = pa.array(wkb_items) + return _process_arrow_batch(pa.RecordBatch.from_struct_array(array)) diff --git a/stac_geoparquet/json_reader.py b/stac_geoparquet/json_reader.py index 2d7f003..fdd4788 100644 --- a/stac_geoparquet/json_reader.py +++ b/stac_geoparquet/json_reader.py @@ -1,13 +1,16 @@ """Return an iterator of items from an ndjson, a json array of items, or a featurecollection of items.""" -import orjson -from typing import Iterator, Dict, Any, Union, Iterable from pathlib import Path +from typing import Any, Dict, Iterable, Sequence, Union + +import orjson + +from stac_geoparquet.arrow._util import batched_iter def read_json( path: Union[str, Path, Iterable[Union[str, Path]]], -) -> Iterator[Dict[str, Any]]: +) -> Iterable[Dict[str, Any]]: """Read a json or ndjson file.""" if isinstance(path, (str, Path)): path = [path] @@ -33,3 +36,10 @@ def read_json( yield from json else: yield from json["features"] + + +def read_json_chunked( + path: Union[str, Path, Iterable[Union[str, Path]]], chunk_size: int +) -> Iterable[Sequence[Dict[str, Any]]]: + """Read from a JSON or NDJSON file in chunks of `chunk_size`.""" + return batched_iter(read_json(path), chunk_size) From 53cd06fe3ff2015ddf5171f28fed250191e70dfe Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 15:29:53 -0400 Subject: [PATCH 08/11] Fixed schema inference --- stac_geoparquet/arrow/_to_arrow.py | 79 ++++++++++++---------------- stac_geoparquet/arrow/_to_parquet.py | 29 +++------- 2 files changed, 42 insertions(+), 66 deletions(-) diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index 174abfb..bbad7a3 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -12,19 +12,16 @@ import pyarrow as pa -from stac_geoparquet.json_reader import read_json -from stac_geoparquet.arrow._util import ( - stac_items_to_arrow, - batched_iter, - update_batch_schema, -) +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.json_reader import read_json_chunked +from stac_geoparquet.arrow._util import stac_items_to_arrow, batched_iter -def parse_stac_items_to_batches( +def parse_stac_items_to_arrow( items: Iterable[Dict[str, Any]], *, chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, ) -> Iterable[pa.RecordBatch]: """Parse a collection of STAC Items to an iterable of :class:`pyarrow.RecordBatch`. @@ -43,45 +40,25 @@ def parse_stac_items_to_batches( Returns: an iterable of pyarrow RecordBatches with the STAC-GeoParquet representation of items. """ - for item_batch in batched_iter(items, chunk_size): - yield stac_items_to_arrow(item_batch, schema=schema) + if schema is not None: + if isinstance(schema, InferredSchema): + schema = schema.inner + # If schema is provided, then for better memory usage we parse input STAC items + # to Arrow batches in chunks. + for chunk in batched_iter(items, chunk_size): + yield stac_items_to_arrow(chunk, schema=schema) -def parse_stac_items_to_arrow( - items: Iterable[Dict[str, Any]], - *, - chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, -) -> pa.Table: - batches = parse_stac_items_to_batches(items, chunk_size=chunk_size, schema=schema) - if schema is not None: - return pa.Table.from_batches(batches, schema=schema) - - for batch in batches: - if schema is None: - schema = batch.schema - else: - schema = pa.unify_schemas( - [schema, batch.schema], promote_options="permissive" - ) - return pa.Table.from_batches( - (update_batch_schema(batch, schema) for batch in batches), schema=schema - ) - - -def parse_stac_ndjson_to_batches( - path: Union[str, Path], - *, - chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, -) -> Iterable[pa.RecordBatch]: - return parse_stac_items_to_batches( - read_json(path), chunk_size=chunk_size, schema=schema - ) + else: + # If schema is _not_ provided, then we must convert to Arrow all at once, or + # else it would be possible for a STAC item late in the collection (after the + # first chunk) to have a different schema and not match the schema inferred for + # the first chunk. + yield stac_items_to_arrow(items) def parse_stac_ndjson_to_arrow( - path: Union[Union[str, Path], Iterable[Union[str, Path]]], + path: Union[str, Path, Iterable[Union[str, Path]]], *, chunk_size: int = 65536, schema: Optional[pa.Schema] = None, @@ -104,6 +81,18 @@ def parse_stac_ndjson_to_arrow( Yields: Arrow RecordBatch with a single chunk of Item data. """ - return parse_stac_items_to_arrow( - read_json(path), chunk_size=chunk_size, schema=schema - ) + # If the schema was not provided, then we need to load all data into memory at once + # to perform schema resolution. + if schema is None: + inferred_schema = InferredSchema() + inferred_schema.update_from_json(path, chunk_size=chunk_size) + yield from parse_stac_ndjson_to_arrow( + path, chunk_size=chunk_size, schema=inferred_schema + ) + return + + if isinstance(schema, InferredSchema): + schema = schema.inner + + for batch in read_json_chunked(path, chunk_size=chunk_size): + yield stac_items_to_arrow(batch, schema=schema) diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 3e0bcbe..294e216 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -6,9 +6,7 @@ import pyarrow.parquet as pq from stac_geoparquet.arrow._schema.models import InferredSchema -from stac_geoparquet.arrow._to_arrow import parse_stac_items_to_batches -from stac_geoparquet.json_reader import read_json -from stac_geoparquet.arrow._util import update_batch_schema +from stac_geoparquet.arrow._to_arrow import parse_stac_ndjson_to_arrow from stac_geoparquet.arrow._crs import WGS84_CRS_JSON @@ -32,26 +30,15 @@ def parse_stac_ndjson_to_parquet( infer a common schema across all data and another to read the data and iteratively convert to GeoParquet. """ - batches = parse_stac_items_to_batches( - read_json(input_path), chunk_size=chunk_size, schema=schema - ) - if schema is None: - unified_batches = [] - for batch in batches: - if schema is None: - schema = batch.schema - else: - schema = pa.unify_schemas( - [schema, batch.schema], promote_options="permissive" - ) - unified_batches.append(update_batch_schema(batch, schema)) - batches = unified_batches - - assert schema is not None - schema = schema.with_metadata(_create_geoparquet_metadata()) + batches_iter = parse_stac_ndjson_to_arrow( + input_path, chunk_size=chunk_size, schema=schema + ) + first_batch = next(batches_iter) + schema = first_batch.schema.with_metadata(_create_geoparquet_metadata()) with pq.ParquetWriter(output_path, schema, **kwargs) as writer: - for batch in batches: + writer.write_batch(first_batch) + for batch in batches_iter: writer.write_batch(batch) From a5b95fe2444b0d33da95aa3fb789a152df595cc6 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 21 May 2024 16:17:39 -0400 Subject: [PATCH 09/11] handle iterable output --- README.md | 3 ++- pyproject.toml | 3 ++- tests/test_arrow.py | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e8faa7a..f9859de 100644 --- a/README.md +++ b/README.md @@ -19,11 +19,12 @@ Note that `stac_geoparquet` lifts the keys in the item `properties` up to the to >>> import requests >>> import stac_geoparquet.arrow >>> import pyarrow.parquet +>>> import pyarrow as pa >>> items = requests.get( ... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items" ... ).json()["features"] ->>> table = stac_geoparquet.arrow.parse_stac_items_to_arrow(items) +>>> table = pa.Table.from_batches(stac_geoparquet.arrow.parse_stac_items_to_arrow(items)) >>> stac_geoparquet.arrow.to_parquet(table, "items.parquet") >>> table2 = pyarrow.parquet.read_table("items.parquet") >>> items2 = list(stac_geoparquet.arrow.stac_table_to_items(table2)) diff --git a/pyproject.toml b/pyproject.toml index 1d74323..319914c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,8 @@ dependencies = [ "geopandas", "packaging", "pandas", - "pyarrow", + # Needed for RecordBatch.append_column + "pyarrow>=16", "pyproj", "pystac", "shapely", diff --git a/tests/test_arrow.py b/tests/test_arrow.py index 4b201f6..2b9bca4 100644 --- a/tests/test_arrow.py +++ b/tests/test_arrow.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Any, Dict, Sequence, Union +import pyarrow as pa import pytest from ciso8601 import parse_rfc3339 @@ -197,7 +198,7 @@ def test_round_trip(collection_id: str): with open(HERE / "data" / f"{collection_id}-pc.json") as f: items = json.load(f) - table = parse_stac_items_to_arrow(items) + table = pa.Table.from_batches(parse_stac_items_to_arrow(items)) items_result = list(stac_table_to_items(table)) for result, expected in zip(items_result, items): @@ -209,7 +210,7 @@ def test_table_contains_geoarrow_metadata(): with open(HERE / "data" / f"{collection_id}-pc.json") as f: items = json.load(f) - table = parse_stac_items_to_arrow(items) + table = pa.Table.from_batches(parse_stac_items_to_arrow(items)) field_meta = table.schema.field("geometry").metadata assert field_meta[b"ARROW:extension:name"] == b"geoarrow.wkb" assert json.loads(field_meta[b"ARROW:extension:metadata"])["crs"]["id"] == { From a1a23e054c06e411d95571914c40e150f24d7bf7 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 22 May 2024 12:57:08 +0200 Subject: [PATCH 10/11] fix error in refactoring --- stac_geoparquet/arrow/_from_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_from_arrow.py b/stac_geoparquet/arrow/_from_arrow.py index 48b80a6..c06b0ad 100644 --- a/stac_geoparquet/arrow/_from_arrow.py +++ b/stac_geoparquet/arrow/_from_arrow.py @@ -133,7 +133,7 @@ def _lower_properties_from_top_level(batch: pa.RecordBatch) -> pa.RecordBatch: properties_column_fields.append(batch.schema.field(column_idx)) struct_arr = pa.StructArray.from_arrays( - batch.columns, fields=properties_column_fields + batch.select(properties_column_names).columns, fields=properties_column_fields ) return batch.drop_columns(properties_column_names).append_column( From 268aab6d10cdf4e05c65674e19ea3cded2e8dc58 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 22 May 2024 13:04:00 +0200 Subject: [PATCH 11/11] Add schema inference with limit --- stac_geoparquet/arrow/_schema/models.py | 8 ++++++-- stac_geoparquet/arrow/_to_arrow.py | 6 +++++- stac_geoparquet/arrow/_util.py | 6 +++++- stac_geoparquet/json_reader.py | 9 ++++++--- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py index 191f67d..06fcbd2 100644 --- a/stac_geoparquet/arrow/_schema/models.py +++ b/stac_geoparquet/arrow/_schema/models.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, Dict, Iterable, Sequence, Union +from typing import Any, Dict, Iterable, Optional, Sequence, Union import pyarrow as pa @@ -30,6 +30,7 @@ def update_from_json( path: Union[str, Path, Iterable[Union[str, Path]]], *, chunk_size: int = 65536, + limit: Optional[int] = None, ) -> None: """ Update this inferred schema from one or more newline-delimited JSON STAC files. @@ -37,8 +38,11 @@ def update_from_json( Args: path: One or more paths to files with STAC items. chunk_size: The chunk size to load into memory at a time. Defaults to 65536. + + Other args: + limit: The maximum number of JSON Items to use for schema inference """ - for batch in read_json_chunked(path, chunk_size=chunk_size): + for batch in read_json_chunked(path, chunk_size=chunk_size, limit=limit): self.update_from_items(batch) def update_from_items(self, items: Sequence[Dict[str, Any]]) -> None: diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index bbad7a3..b99dcec 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -62,6 +62,7 @@ def parse_stac_ndjson_to_arrow( *, chunk_size: int = 65536, schema: Optional[pa.Schema] = None, + limit: Optional[int] = None, ) -> Iterator[pa.RecordBatch]: """ Convert one or more newline-delimited JSON STAC files to a generator of Arrow @@ -78,6 +79,9 @@ def parse_stac_ndjson_to_arrow( In this case, there will be two full passes over the input data: one to infer a common schema across all data and another to read the data. + Other args: + limit: The maximum number of JSON Items to use for schema inference + Yields: Arrow RecordBatch with a single chunk of Item data. """ @@ -85,7 +89,7 @@ def parse_stac_ndjson_to_arrow( # to perform schema resolution. if schema is None: inferred_schema = InferredSchema() - inferred_schema.update_from_json(path, chunk_size=chunk_size) + inferred_schema.update_from_json(path, chunk_size=chunk_size, limit=limit) yield from parse_stac_ndjson_to_arrow( path, chunk_size=chunk_size, schema=inferred_schema ) diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py index 6fbacc0..5390af5 100644 --- a/stac_geoparquet/arrow/_util.py +++ b/stac_geoparquet/arrow/_util.py @@ -28,14 +28,18 @@ def update_batch_schema( def batched_iter( - lst: Iterable[Dict[str, Any]], n: int + lst: Iterable[Dict[str, Any]], n: int, *, limit: Optional[int] = None ) -> Iterable[Sequence[Dict[str, Any]]]: """Yield successive n-sized chunks from iterable.""" if n < 1: raise ValueError("n must be at least one") it = iter(lst) + count = 0 while batch := tuple(islice(it, n)): yield batch + count += len(batch) + if limit and count >= limit: + return def stac_items_to_arrow( diff --git a/stac_geoparquet/json_reader.py b/stac_geoparquet/json_reader.py index fdd4788..62589d7 100644 --- a/stac_geoparquet/json_reader.py +++ b/stac_geoparquet/json_reader.py @@ -1,7 +1,7 @@ """Return an iterator of items from an ndjson, a json array of items, or a featurecollection of items.""" from pathlib import Path -from typing import Any, Dict, Iterable, Sequence, Union +from typing import Any, Dict, Iterable, Optional, Sequence, Union import orjson @@ -39,7 +39,10 @@ def read_json( def read_json_chunked( - path: Union[str, Path, Iterable[Union[str, Path]]], chunk_size: int + path: Union[str, Path, Iterable[Union[str, Path]]], + chunk_size: int, + *, + limit: Optional[int] = None, ) -> Iterable[Sequence[Dict[str, Any]]]: """Read from a JSON or NDJSON file in chunks of `chunk_size`.""" - return batched_iter(read_json(path), chunk_size) + return batched_iter(read_json(path), chunk_size, limit=limit)