Skip to content

Commit

Permalink
Allow writing either GeoParquet 1.0 or GeoParquet 1.1 schema metadata (
Browse files Browse the repository at this point in the history
…#61)

* Write GeoParquet with defined schema version

* fix writing delta lake

* Rename to Keyword Args

* refactor constants into separate file

* export constants in public api

* Separate check for writing covering to GeoParquet meta

* fix mypy type check

* fix typing

* reduce copy
  • Loading branch information
kylebarron authored Jun 20, 2024
1 parent 732188e commit 768df20
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 16 deletions.
5 changes: 5 additions & 0 deletions stac_geoparquet/arrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@
stac_table_to_items,
stac_table_to_ndjson,
)
from ._constants import (
DEFAULT_JSON_CHUNK_SIZE,
DEFAULT_PARQUET_SCHEMA_VERSION,
SUPPORTED_PARQUET_SCHEMA_VERSIONS,
)
from ._to_parquet import parse_stac_ndjson_to_parquet, to_parquet
5 changes: 3 additions & 2 deletions stac_geoparquet/arrow/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pyarrow as pa

from stac_geoparquet.arrow._batch import StacArrowBatch, StacJsonBatch
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
from stac_geoparquet.arrow._schema.models import InferredSchema
from stac_geoparquet.arrow._util import batched_iter
from stac_geoparquet.json_reader import read_json_chunked
Expand Down Expand Up @@ -55,7 +56,7 @@ def parse_stac_items_to_arrow(
def parse_stac_ndjson_to_arrow(
path: str | Path | Iterable[str | Path],
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
schema: pa.Schema | None = None,
limit: int | None = None,
) -> Iterator[pa.RecordBatch]:
Expand All @@ -74,7 +75,7 @@ def parse_stac_ndjson_to_arrow(
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data.
Other args:
Keyword Args:
limit: The maximum number of JSON Items to use for schema inference
Yields:
Expand Down
10 changes: 10 additions & 0 deletions stac_geoparquet/arrow/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Literal

DEFAULT_JSON_CHUNK_SIZE = 65536
"""The default chunk size to use for reading JSON into memory."""

SUPPORTED_PARQUET_SCHEMA_VERSIONS = Literal["1.0.0", "1.1.0"]
"""A Literal type with the supported GeoParquet schema versions."""

DEFAULT_PARQUET_SCHEMA_VERSION: SUPPORTED_PARQUET_SCHEMA_VERSIONS = "1.1.0"
"""The default GeoParquet schema version written to file."""
32 changes: 29 additions & 3 deletions stac_geoparquet/arrow/_delta_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from deltalake import write_deltalake

from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow
from stac_geoparquet.arrow._to_parquet import create_geoparquet_metadata
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
from stac_geoparquet.arrow._to_parquet import (
DEFAULT_PARQUET_SCHEMA_VERSION,
SUPPORTED_PARQUET_SCHEMA_VERSIONS,
create_geoparquet_metadata,
)

if TYPE_CHECKING:
from deltalake import DeltaTable
Expand All @@ -18,17 +23,38 @@ def parse_stac_ndjson_to_delta_lake(
input_path: str | Path | Iterable[str | Path],
table_or_uri: str | Path | DeltaTable,
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
schema: pa.Schema | None = None,
limit: int | None = None,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION,
**kwargs: Any,
) -> None:
"""Convert one or more newline-delimited JSON STAC files to Delta Lake
Args:
input_path: One or more paths to files with STAC items.
table_or_uri: A path to the output Delta Lake table
Args:
chunk_size: The chunk size to use for reading JSON into memory. Defaults to
65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will first be inferred via a full pass over the input data.
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data and
iteratively convert to GeoParquet.
limit: The maximum number of JSON records to convert.
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""
batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema, limit=limit
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(
create_geoparquet_metadata(pa.Table.from_batches([first_batch]))
create_geoparquet_metadata(
pa.Table.from_batches([first_batch]), schema_version=schema_version
)
)
combined_iter = itertools.chain([first_batch], batches_iter)
write_deltalake(table_or_uri, combined_iter, schema=schema, engine="rust", **kwargs)
5 changes: 3 additions & 2 deletions stac_geoparquet/arrow/_schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pyarrow as pa

from stac_geoparquet.arrow._batch import StacJsonBatch
from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE
from stac_geoparquet.json_reader import read_json_chunked


Expand All @@ -31,7 +32,7 @@ def update_from_json(
self,
path: str | Path | Iterable[str | Path],
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
limit: int | None = None,
) -> None:
"""
Expand All @@ -41,7 +42,7 @@ def update_from_json(
path: One or more paths to files with STAC items.
chunk_size: The chunk size to load into memory at a time. Defaults to 65536.
Other args:
Keyword Args:
limit: The maximum number of JSON Items to use for schema inference
"""
for batch in read_json_chunked(path, chunk_size=chunk_size, limit=limit):
Expand Down
58 changes: 49 additions & 9 deletions stac_geoparquet/arrow/_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import pyarrow.parquet as pq

from stac_geoparquet.arrow._api import parse_stac_ndjson_to_arrow
from stac_geoparquet.arrow._constants import (
DEFAULT_JSON_CHUNK_SIZE,
DEFAULT_PARQUET_SCHEMA_VERSION,
SUPPORTED_PARQUET_SCHEMA_VERSIONS,
)
from stac_geoparquet.arrow._crs import WGS84_CRS_JSON
from stac_geoparquet.arrow._schema.models import InferredSchema

Expand All @@ -16,72 +21,97 @@ def parse_stac_ndjson_to_parquet(
input_path: str | Path | Iterable[str | Path],
output_path: str | Path,
*,
chunk_size: int = 65536,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
schema: pa.Schema | InferredSchema | None = None,
limit: int | None = None,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION,
**kwargs: Any,
) -> None:
"""Convert one or more newline-delimited JSON STAC files to GeoParquet
Args:
input_path: One or more paths to files with STAC items.
output_path: A path to the output Parquet file.
Keyword Args:
chunk_size: The chunk size. Defaults to 65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will first be inferred via a full pass over the input data.
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data and
iteratively convert to GeoParquet.
limit: The maximum number of JSON records to convert.
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""

batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema, limit=limit
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(
create_geoparquet_metadata(pa.Table.from_batches([first_batch]))
create_geoparquet_metadata(
pa.Table.from_batches([first_batch]), schema_version=schema_version
)
)
with pq.ParquetWriter(output_path, schema, **kwargs) as writer:
writer.write_batch(first_batch)
for batch in batches_iter:
writer.write_batch(batch)


def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None:
def to_parquet(
table: pa.Table,
where: Any,
*,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS = DEFAULT_PARQUET_SCHEMA_VERSION,
**kwargs: Any,
) -> None:
"""Write an Arrow table with STAC data to GeoParquet
This writes metadata compliant with GeoParquet 1.1.
Args:
table: The table to write to Parquet
where: The destination for saving.
Keyword Args:
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""
metadata = table.schema.metadata or {}
metadata.update(create_geoparquet_metadata(table))
metadata.update(create_geoparquet_metadata(table, schema_version=schema_version))
table = table.replace_schema_metadata(metadata)

pq.write_table(table, where, **kwargs)


def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]:
def create_geoparquet_metadata(
table: pa.Table,
*,
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS,
) -> dict[bytes, bytes]:
# TODO: include bbox of geometries
column_meta = {
"encoding": "WKB",
# TODO: specify known geometry types
"geometry_types": [],
"crs": WGS84_CRS_JSON,
"edges": "planar",
"covering": {
}

if schema_version_has_bbox_mapping(schema_version):
column_meta["covering"] = {
"bbox": {
"xmin": ["bbox", "xmin"],
"ymin": ["bbox", "ymin"],
"xmax": ["bbox", "xmax"],
"ymax": ["bbox", "ymax"],
}
},
}
}

geo_meta: dict[str, Any] = {
"version": "1.1.0-dev",
"version": schema_version,
"columns": {"geometry": column_meta},
"primary_column": "geometry",
}
Expand All @@ -101,3 +131,13 @@ def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]:
}

return {b"geo": json.dumps(geo_meta).encode("utf-8")}


def schema_version_has_bbox_mapping(
schema_version: SUPPORTED_PARQUET_SCHEMA_VERSIONS,
) -> bool:
"""
Return true if this GeoParquet schema version supports bounding box covering
metadata.
"""
return int(schema_version.split(".")[1]) >= 1

0 comments on commit 768df20

Please sign in to comment.