From 1853d970272e70808db1ac9f6cf1ea1a2771ac71 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 14 Jun 2024 16:27:17 -0400 Subject: [PATCH 1/9] Write GeoParquet with defined schema version --- stac_geoparquet/arrow/_to_parquet.py | 43 ++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index bce3d96..73a63dc 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -2,7 +2,7 @@ import json from pathlib import Path -from typing import Any, Iterable +from typing import Any, Iterable, Literal import pyarrow as pa import pyarrow.parquet as pq @@ -19,6 +19,7 @@ def parse_stac_ndjson_to_parquet( chunk_size: int = 65536, schema: pa.Schema | InferredSchema | None = None, limit: int | None = None, + schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0", **kwargs: Any, ) -> None: """Convert one or more newline-delimited JSON STAC files to GeoParquet @@ -26,12 +27,17 @@ def parse_stac_ndjson_to_parquet( Args: input_path: One or more paths to files with STAC items. output_path: A path to the output Parquet file. + + Other args: chunk_size: The chunk size. Defaults to 65536. schema: The schema to represent the input STAC data. Defaults to None, in which case the schema will first be inferred via a full pass over the input data. In this case, there will be two full passes over the input data: one to infer a common schema across all data and another to read the data and iteratively convert to GeoParquet. + limit: The maximum number of JSON records to convert. + schema_version: GeoParquet specification version; if not provided will default + to latest supported version. """ batches_iter = parse_stac_ndjson_to_arrow( @@ -39,7 +45,9 @@ def parse_stac_ndjson_to_parquet( ) first_batch = next(batches_iter) schema = first_batch.schema.with_metadata( - create_geoparquet_metadata(pa.Table.from_batches([first_batch])) + create_geoparquet_metadata( + pa.Table.from_batches([first_batch]), schema_version=schema_version + ) ) with pq.ParquetWriter(output_path, schema, **kwargs) as writer: writer.write_batch(first_batch) @@ -47,7 +55,13 @@ def parse_stac_ndjson_to_parquet( writer.write_batch(batch) -def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: +def to_parquet( + table: pa.Table, + where: Any, + *, + schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0", + **kwargs: Any, +) -> None: """Write an Arrow table with STAC data to GeoParquet This writes metadata compliant with GeoParquet 1.1. @@ -55,15 +69,23 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: Args: table: The table to write to Parquet where: The destination for saving. + + Other args: + schema_version: GeoParquet specification version; if not provided will default + to latest supported version. """ metadata = table.schema.metadata or {} - metadata.update(create_geoparquet_metadata(table)) + metadata.update(create_geoparquet_metadata(table, schema_version=schema_version)) table = table.replace_schema_metadata(metadata) pq.write_table(table, where, **kwargs) -def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]: +def create_geoparquet_metadata( + table: pa.Table, + *, + schema_version: Literal["1.0.0", "1.1.0"], +) -> dict[bytes, bytes]: # TODO: include bbox of geometries column_meta = { "encoding": "WKB", @@ -71,17 +93,20 @@ def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]: "geometry_types": [], "crs": WGS84_CRS_JSON, "edges": "planar", - "covering": { + } + + if int(schema_version.split(".")[1]) >= 1: + column_meta["covering"] = { "bbox": { "xmin": ["bbox", "xmin"], "ymin": ["bbox", "ymin"], "xmax": ["bbox", "xmax"], "ymax": ["bbox", "ymax"], } - }, - } + } + geo_meta: dict[str, Any] = { - "version": "1.1.0-dev", + "version": schema_version, "columns": {"geometry": column_meta}, "primary_column": "geometry", } From 1f64850d153930e8e833d23f0b2c38942beb0ecc Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 14 Jun 2024 16:38:36 -0400 Subject: [PATCH 2/9] fix writing delta lake --- stac_geoparquet/arrow/_delta_lake.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/stac_geoparquet/arrow/_delta_lake.py b/stac_geoparquet/arrow/_delta_lake.py index d45c4bc..bb1f500 100644 --- a/stac_geoparquet/arrow/_delta_lake.py +++ b/stac_geoparquet/arrow/_delta_lake.py @@ -2,7 +2,7 @@ import itertools from pathlib import Path -from typing import TYPE_CHECKING, Any, Iterable +from typing import TYPE_CHECKING, Any, Iterable, Literal import pyarrow as pa from deltalake import write_deltalake @@ -21,14 +21,35 @@ def parse_stac_ndjson_to_delta_lake( chunk_size: int = 65536, schema: pa.Schema | None = None, limit: int | None = None, + schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0", **kwargs: Any, ) -> None: + """Convert one or more newline-delimited JSON STAC files to Delta Lake + + Args: + input_path: One or more paths to files with STAC items. + table_or_uri: A path to the output Delta Lake table + + Args: + chunk_size: The chunk size to use for reading JSON into memory. Defaults to + 65536. + schema: The schema to represent the input STAC data. Defaults to None, in which + case the schema will first be inferred via a full pass over the input data. + In this case, there will be two full passes over the input data: one to + infer a common schema across all data and another to read the data and + iteratively convert to GeoParquet. + limit: The maximum number of JSON records to convert. + schema_version: GeoParquet specification version; if not provided will default + to latest supported version. + """ batches_iter = parse_stac_ndjson_to_arrow( input_path, chunk_size=chunk_size, schema=schema, limit=limit ) first_batch = next(batches_iter) schema = first_batch.schema.with_metadata( - create_geoparquet_metadata(pa.Table.from_batches([first_batch])) + create_geoparquet_metadata( + pa.Table.from_batches([first_batch]), schema_version=schema_version + ) ) combined_iter = itertools.chain([first_batch], batches_iter) write_deltalake(table_or_uri, combined_iter, schema=schema, engine="rust", **kwargs) From b0bb1cd8d98c5f0c26c54b090e7c8a3d24811e14 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 18 Jun 2024 11:03:40 -0400 Subject: [PATCH 3/9] Rename to Keyword Args --- stac_geoparquet/arrow/_api.py | 2 +- stac_geoparquet/arrow/_schema/models.py | 2 +- stac_geoparquet/arrow/_to_parquet.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stac_geoparquet/arrow/_api.py b/stac_geoparquet/arrow/_api.py index 165893e..35e4a00 100644 --- a/stac_geoparquet/arrow/_api.py +++ b/stac_geoparquet/arrow/_api.py @@ -74,7 +74,7 @@ def parse_stac_ndjson_to_arrow( In this case, there will be two full passes over the input data: one to infer a common schema across all data and another to read the data. - Other args: + Keyword Args: limit: The maximum number of JSON Items to use for schema inference Yields: diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py index 4043ada..a5b6373 100644 --- a/stac_geoparquet/arrow/_schema/models.py +++ b/stac_geoparquet/arrow/_schema/models.py @@ -41,7 +41,7 @@ def update_from_json( path: One or more paths to files with STAC items. chunk_size: The chunk size to load into memory at a time. Defaults to 65536. - Other args: + Keyword Args: limit: The maximum number of JSON Items to use for schema inference """ for batch in read_json_chunked(path, chunk_size=chunk_size, limit=limit): diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 73a63dc..b3e6afe 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -28,7 +28,7 @@ def parse_stac_ndjson_to_parquet( input_path: One or more paths to files with STAC items. output_path: A path to the output Parquet file. - Other args: + Keyword Args: chunk_size: The chunk size. Defaults to 65536. schema: The schema to represent the input STAC data. Defaults to None, in which case the schema will first be inferred via a full pass over the input data. @@ -70,7 +70,7 @@ def to_parquet( table: The table to write to Parquet where: The destination for saving. - Other args: + Keyword Args: schema_version: GeoParquet specification version; if not provided will default to latest supported version. """ From 7fdb56ac42d368e3c5220690d6fd1aced4a33efd Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 20 Jun 2024 16:14:15 -0400 Subject: [PATCH 4/9] refactor constants into separate file --- stac_geoparquet/arrow/_api.py | 3 ++- stac_geoparquet/arrow/_constants.py | 10 ++++++++++ stac_geoparquet/arrow/_delta_lake.py | 13 +++++++++---- stac_geoparquet/arrow/_schema/models.py | 3 ++- stac_geoparquet/arrow/_to_parquet.py | 15 ++++++++++----- 5 files changed, 33 insertions(+), 11 deletions(-) create mode 100644 stac_geoparquet/arrow/_constants.py diff --git a/stac_geoparquet/arrow/_api.py b/stac_geoparquet/arrow/_api.py index 35e4a00..06e2fe2 100644 --- a/stac_geoparquet/arrow/_api.py +++ b/stac_geoparquet/arrow/_api.py @@ -7,6 +7,7 @@ import pyarrow as pa from stac_geoparquet.arrow._batch import StacArrowBatch, StacJsonBatch +from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE from stac_geoparquet.arrow._schema.models import InferredSchema from stac_geoparquet.arrow._util import batched_iter from stac_geoparquet.json_reader import read_json_chunked @@ -55,7 +56,7 @@ def parse_stac_items_to_arrow( def parse_stac_ndjson_to_arrow( path: str | Path | Iterable[str | Path], *, - chunk_size: int = 65536, + chunk_size: int = DEFAULT_JSON_CHUNK_SIZE, schema: pa.Schema | None = None, limit: int | None = None, ) -> Iterator[pa.RecordBatch]: diff --git a/stac_geoparquet/arrow/_constants.py b/stac_geoparquet/arrow/_constants.py new file mode 100644 index 0000000..5001bef --- /dev/null +++ b/stac_geoparquet/arrow/_constants.py @@ -0,0 +1,10 @@ +from typing import Literal + +DEFAULT_JSON_CHUNK_SIZE = 65536 +"""The default chunk size to use for reading JSON into memory.""" + +SUPPORTED_PARQUET_SCHEMA_VERSIONS = Literal["1.0.0", "1.1.0"] +"""A Literal type with the supported GeoParquet schema versions.""" + +DEFAULT_PARQUET_SCHEMA_VERSION = "1.1.0" +"""The default GeoParquet schema version written to file.""" diff --git a/stac_geoparquet/arrow/_delta_lake.py b/stac_geoparquet/arrow/_delta_lake.py index bb1f500..35ee065 100644 --- a/stac_geoparquet/arrow/_delta_lake.py +++ b/stac_geoparquet/arrow/_delta_lake.py @@ -2,13 +2,18 @@ import itertools from pathlib import Path -from typing import TYPE_CHECKING, Any, Iterable, Literal +from typing import TYPE_CHECKING, Any, Iterable import pyarrow as pa from deltalake import write_deltalake from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow -from stac_geoparquet.arrow._to_parquet import create_geoparquet_metadata +from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE +from stac_geoparquet.arrow._to_parquet import ( + DEFAULT_PARQUET_SCHEMA_VERSION, + SUPPORTED_PARQUET_SCHEMA_VERSIONS, + create_geoparquet_metadata, +) if TYPE_CHECKING: from deltalake import DeltaTable @@ -18,10 +23,10 @@ def parse_stac_ndjson_to_delta_lake( input_path: str | Path | Iterable[str | Path], table_or_uri: str | Path | DeltaTable, *, - chunk_size: int = 65536, + chunk_size: int = DEFAULT_JSON_CHUNK_SIZE, schema: pa.Schema | None = None, limit: int | None = None, - schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0", + schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION, **kwargs: Any, ) -> None: """Convert one or more newline-delimited JSON STAC files to Delta Lake diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py index a5b6373..5d19f48 100644 --- a/stac_geoparquet/arrow/_schema/models.py +++ b/stac_geoparquet/arrow/_schema/models.py @@ -6,6 +6,7 @@ import pyarrow as pa from stac_geoparquet.arrow._batch import StacJsonBatch +from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE from stac_geoparquet.json_reader import read_json_chunked @@ -31,7 +32,7 @@ def update_from_json( self, path: str | Path | Iterable[str | Path], *, - chunk_size: int = 65536, + chunk_size: int = DEFAULT_JSON_CHUNK_SIZE, limit: int | None = None, ) -> None: """ diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index b3e6afe..80be3cb 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -2,12 +2,17 @@ import json from pathlib import Path -from typing import Any, Iterable, Literal +from typing import Any, Iterable import pyarrow as pa import pyarrow.parquet as pq from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow +from stac_geoparquet.arrow._constants import ( + DEFAULT_JSON_CHUNK_SIZE, + DEFAULT_PARQUET_SCHEMA_VERSION, + SUPPORTED_PARQUET_SCHEMA_VERSIONS, +) from stac_geoparquet.arrow._crs import WGS84_CRS_JSON from stac_geoparquet.arrow._schema.models import InferredSchema @@ -16,10 +21,10 @@ def parse_stac_ndjson_to_parquet( input_path: str | Path | Iterable[str | Path], output_path: str | Path, *, - chunk_size: int = 65536, + chunk_size: int = DEFAULT_JSON_CHUNK_SIZE, schema: pa.Schema | InferredSchema | None = None, limit: int | None = None, - schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0", + schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION, **kwargs: Any, ) -> None: """Convert one or more newline-delimited JSON STAC files to GeoParquet @@ -59,7 +64,7 @@ def to_parquet( table: pa.Table, where: Any, *, - schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0", + schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION, **kwargs: Any, ) -> None: """Write an Arrow table with STAC data to GeoParquet @@ -84,7 +89,7 @@ def to_parquet( def create_geoparquet_metadata( table: pa.Table, *, - schema_version: Literal["1.0.0", "1.1.0"], + schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS, ) -> dict[bytes, bytes]: # TODO: include bbox of geometries column_meta = { From ff065d2b344b78c1ef572d2d55ec47a8f1d3bbdd Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 20 Jun 2024 16:14:37 -0400 Subject: [PATCH 5/9] export constants in public api --- stac_geoparquet/arrow/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/stac_geoparquet/arrow/__init__.py b/stac_geoparquet/arrow/__init__.py index c88deb2..41f1874 100644 --- a/stac_geoparquet/arrow/__init__.py +++ b/stac_geoparquet/arrow/__init__.py @@ -4,4 +4,9 @@ stac_table_to_items, stac_table_to_ndjson, ) +from ._constants import ( + DEFAULT_JSON_CHUNK_SIZE, + DEFAULT_PARQUET_SCHEMA_VERSION, + SUPPORTED_PARQUET_SCHEMA_VERSIONS, +) from ._to_parquet import parse_stac_ndjson_to_parquet, to_parquet From 165ff144309f70d756ed14f3dd3d97bac463964c Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 20 Jun 2024 16:16:41 -0400 Subject: [PATCH 6/9] Separate check for writing covering to GeoParquet meta --- stac_geoparquet/arrow/_to_parquet.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 80be3cb..0108c17 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -100,7 +100,7 @@ def create_geoparquet_metadata( "edges": "planar", } - if int(schema_version.split(".")[1]) >= 1: + if schema_version_has_bbox_mapping(schema_version): column_meta["covering"] = { "bbox": { "xmin": ["bbox", "xmin"], @@ -131,3 +131,11 @@ def create_geoparquet_metadata( } return {b"geo": json.dumps(geo_meta).encode("utf-8")} + + +def schema_version_has_bbox_mapping(schema_version: str) -> bool: + """ + Return true if this GeoParquet schema version supports bounding box covering + metadata. + """ + return int(schema_version.split(".")[1]) >= 1 From 497d1f83d3da06707c16e3cefdcb945e96f379db Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 20 Jun 2024 16:19:54 -0400 Subject: [PATCH 7/9] fix mypy type check --- stac_geoparquet/arrow/_to_parquet.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 0108c17..85115cc 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -133,7 +133,9 @@ def create_geoparquet_metadata( return {b"geo": json.dumps(geo_meta).encode("utf-8")} -def schema_version_has_bbox_mapping(schema_version: str) -> bool: +def schema_version_has_bbox_mapping( + schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS, +) -> bool: """ Return true if this GeoParquet schema version supports bounding box covering metadata. From 11451636cd1798dbb548118d900be883f09f2762 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 20 Jun 2024 16:39:15 -0400 Subject: [PATCH 8/9] fix typing --- stac_geoparquet/arrow/_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_constants.py b/stac_geoparquet/arrow/_constants.py index 5001bef..1ed4c50 100644 --- a/stac_geoparquet/arrow/_constants.py +++ b/stac_geoparquet/arrow/_constants.py @@ -6,5 +6,5 @@ SUPPORTED_PARQUET_SCHEMA_VERSIONS = Literal["1.0.0", "1.1.0"] """A Literal type with the supported GeoParquet schema versions.""" -DEFAULT_PARQUET_SCHEMA_VERSION = "1.1.0" +DEFAULT_PARQUET_SCHEMA_VERSION: Literal["1.0.0", "1.1.0"] = "1.1.0" """The default GeoParquet schema version written to file.""" From ba52e22de546ecdd0f8818f1cf901428f306eb6a Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 20 Jun 2024 16:39:36 -0400 Subject: [PATCH 9/9] reduce copy --- stac_geoparquet/arrow/_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_geoparquet/arrow/_constants.py b/stac_geoparquet/arrow/_constants.py index 1ed4c50..86b1ae1 100644 --- a/stac_geoparquet/arrow/_constants.py +++ b/stac_geoparquet/arrow/_constants.py @@ -6,5 +6,5 @@ SUPPORTED_PARQUET_SCHEMA_VERSIONS = Literal["1.0.0", "1.1.0"] """A Literal type with the supported GeoParquet schema versions.""" -DEFAULT_PARQUET_SCHEMA_VERSION: Literal["1.0.0", "1.1.0"] = "1.1.0" +DEFAULT_PARQUET_SCHEMA_VERSION: SUPPORTED_PARQUET_SCHEMA_VERSIONS = "1.1.0" """The default GeoParquet schema version written to file."""