Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow writing either GeoParquet 1.0 or GeoParquet 1.1 schema metadata #61

Merged
merged 10 commits into from
Jun 20, 2024
5 changes: 5 additions & 0 deletions stac_geoparquet/arrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@
stac_table_to_items,
stac_table_to_ndjson,
)
from ._constants import (
DEFAULT_JSON_CHUNK_SIZE,
DEFAULT_PARQUET_SCHEMA_VERSION,
SUPPORTED_PARQUET_SCHEMA_VERSIONS,
)
from ._to_parquet import parse_stac_ndjson_to_parquet, to_parquet
5 changes: 3 additions & 2 deletions stac_geoparquet/arrow/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pyarrow as pa

from stac_geoparquet.arrow._batch import StacArrowBatch, StacJsonBatch
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
from stac_geoparquet.arrow._schema.models import InferredSchema
from stac_geoparquet.arrow._util import batched_iter
from stac_geoparquet.json_reader import read_json_chunked
Expand Down Expand Up @@ -55,7 +56,7 @@ def parse_stac_items_to_arrow(
def parse_stac_ndjson_to_arrow(
path: str | Path | Iterable[str | Path],
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
schema: pa.Schema | None = None,
limit: int | None = None,
) -> Iterator[pa.RecordBatch]:
Expand All @@ -74,7 +75,7 @@ def parse_stac_ndjson_to_arrow(
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data.

Other args:
Keyword Args:
limit: The maximum number of JSON Items to use for schema inference

Yields:
Expand Down
10 changes: 10 additions & 0 deletions stac_geoparquet/arrow/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Literal

DEFAULT_JSON_CHUNK_SIZE = 65536
"""The default chunk size to use for reading JSON into memory."""

SUPPORTED_PARQUET_SCHEMA_VERSIONS = Literal["1.0.0", "1.1.0"]
"""A Literal type with the supported GeoParquet schema versions."""

DEFAULT_PARQUET_SCHEMA_VERSION: SUPPORTED_PARQUET_SCHEMA_VERSIONS = "1.1.0"
"""The default GeoParquet schema version written to file."""
32 changes: 29 additions & 3 deletions stac_geoparquet/arrow/_delta_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from deltalake import write_deltalake

from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow
from stac_geoparquet.arrow._to_parquet import create_geoparquet_metadata
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
from stac_geoparquet.arrow._to_parquet import (
DEFAULT_PARQUET_SCHEMA_VERSION,
SUPPORTED_PARQUET_SCHEMA_VERSIONS,
create_geoparquet_metadata,
)

if TYPE_CHECKING:
from deltalake import DeltaTable
Expand All @@ -18,17 +23,38 @@ def parse_stac_ndjson_to_delta_lake(
input_path: str | Path | Iterable[str | Path],
table_or_uri: str | Path | DeltaTable,
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
schema: pa.Schema | None = None,
limit: int | None = None,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION,
**kwargs: Any,
) -> None:
"""Convert one or more newline-delimited JSON STAC files to Delta Lake

Args:
input_path: One or more paths to files with STAC items.
table_or_uri: A path to the output Delta Lake table

Args:
chunk_size: The chunk size to use for reading JSON into memory. Defaults to
65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will first be inferred via a full pass over the input data.
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data and
iteratively convert to GeoParquet.
limit: The maximum number of JSON records to convert.
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
"""
batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema, limit=limit
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(
create_geoparquet_metadata(pa.Table.from_batches([first_batch]))
create_geoparquet_metadata(
pa.Table.from_batches([first_batch]), schema_version=schema_version
)
)
combined_iter = itertools.chain([first_batch], batches_iter)
write_deltalake(table_or_uri, combined_iter, schema=schema, engine="rust", **kwargs)
5 changes: 3 additions & 2 deletions stac_geoparquet/arrow/_schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pyarrow as pa

from stac_geoparquet.arrow._batch import StacJsonBatch
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
from stac_geoparquet.json_reader import read_json_chunked


Expand All @@ -31,7 +32,7 @@ def update_from_json(
self,
path: str | Path | Iterable[str | Path],
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
limit: int | None = None,
) -> None:
"""
Expand All @@ -41,7 +42,7 @@ def update_from_json(
path: One or more paths to files with STAC items.
chunk_size: The chunk size to load into memory at a time. Defaults to 65536.

Other args:
Keyword Args:
limit: The maximum number of JSON Items to use for schema inference
"""
for batch in read_json_chunked(path, chunk_size=chunk_size, limit=limit):
Expand Down
58 changes: 49 additions & 9 deletions stac_geoparquet/arrow/_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import pyarrow.parquet as pq

from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow
from stac_geoparquet.arrow._constants import (
DEFAULT_JSON_CHUNK_SIZE,
DEFAULT_PARQUET_SCHEMA_VERSION,
SUPPORTED_PARQUET_SCHEMA_VERSIONS,
)
from stac_geoparquet.arrow._crs import WGS84_CRS_JSON
from stac_geoparquet.arrow._schema.models import InferredSchema

Expand All @@ -16,72 +21,97 @@ def parse_stac_ndjson_to_parquet(
input_path: str | Path | Iterable[str | Path],
output_path: str | Path,
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
schema: pa.Schema | InferredSchema | None = None,
limit: int | None = None,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION,
**kwargs: Any,
) -> None:
"""Convert one or more newline-delimited JSON STAC files to GeoParquet

Args:
input_path: One or more paths to files with STAC items.
output_path: A path to the output Parquet file.

Keyword Args:
chunk_size: The chunk size. Defaults to 65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will first be inferred via a full pass over the input data.
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data and
iteratively convert to GeoParquet.
limit: The maximum number of JSON records to convert.
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""

batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema, limit=limit
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(
create_geoparquet_metadata(pa.Table.from_batches([first_batch]))
create_geoparquet_metadata(
pa.Table.from_batches([first_batch]), schema_version=schema_version
)
)
with pq.ParquetWriter(output_path, schema, **kwargs) as writer:
writer.write_batch(first_batch)
for batch in batches_iter:
writer.write_batch(batch)


def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None:
def to_parquet(
table: pa.Table,
where: Any,
*,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION,
**kwargs: Any,
) -> None:
"""Write an Arrow table with STAC data to GeoParquet

This writes metadata compliant with GeoParquet 1.1.

Args:
table: The table to write to Parquet
where: The destination for saving.

Keyword Args:
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""
metadata = table.schema.metadata or {}
metadata.update(create_geoparquet_metadata(table))
metadata.update(create_geoparquet_metadata(table, schema_version=schema_version))
table = table.replace_schema_metadata(metadata)

pq.write_table(table, where, **kwargs)


def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]:
def create_geoparquet_metadata(
table: pa.Table,
*,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS,
) -> dict[bytes, bytes]:
# TODO: include bbox of geometries
column_meta = {
"encoding": "WKB",
# TODO: specify known geometry types
"geometry_types": [],
"crs": WGS84_CRS_JSON,
"edges": "planar",
"covering": {
}

if schema_version_has_bbox_mapping(schema_version):
column_meta["covering"] = {
"bbox": {
"xmin": ["bbox", "xmin"],
"ymin": ["bbox", "ymin"],
"xmax": ["bbox", "xmax"],
"ymax": ["bbox", "ymax"],
}
},
}
}

geo_meta: dict[str, Any] = {
"version": "1.1.0-dev",
"version": schema_version,
"columns": {"geometry": column_meta},
"primary_column": "geometry",
}
Expand All @@ -101,3 +131,13 @@ def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]:
}

return {b"geo": json.dumps(geo_meta).encode("utf-8")}


def schema_version_has_bbox_mapping(
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS,
) -> bool:
"""
Return true if this GeoParquet schema version supports bounding box covering
metadata.
"""
return int(schema_version.split(".")[1]) >= 1
Loading