Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow writing either GeoParquet 1.0 or GeoParquet 1.1 schema metadata #61

Merged
merged 10 commits into from
Jun 20, 2024
2 changes: 1 addition & 1 deletion stac_geoparquet/arrow/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def parse_stac_ndjson_to_arrow(
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data.

Other args:
Keyword Args:
limit: The maximum number of JSON Items to use for schema inference

Yields:
Expand Down
25 changes: 23 additions & 2 deletions stac_geoparquet/arrow/_delta_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import itertools
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any, Iterable, Literal

import pyarrow as pa
from deltalake import write_deltalake
Expand All @@ -21,14 +21,35 @@ def parse_stac_ndjson_to_delta_lake(
chunk_size: int = 65536,
schema: pa.Schema | None = None,
limit: int | None = None,
schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0",
**kwargs: Any,
) -> None:
"""Convert one or more newline-delimited JSON STAC files to Delta Lake

Args:
input_path: One or more paths to files with STAC items.
table_or_uri: A path to the output Delta Lake table

Args:
chunk_size: The chunk size to use for reading JSON into memory. Defaults to
65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will first be inferred via a full pass over the input data.
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data and
iteratively convert to GeoParquet.
limit: The maximum number of JSON records to convert.
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
"""
batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema, limit=limit
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(
create_geoparquet_metadata(pa.Table.from_batches([first_batch]))
create_geoparquet_metadata(
pa.Table.from_batches([first_batch]), schema_version=schema_version
)
)
combined_iter = itertools.chain([first_batch], batches_iter)
write_deltalake(table_or_uri, combined_iter, schema=schema, engine="rust", **kwargs)
2 changes: 1 addition & 1 deletion stac_geoparquet/arrow/_schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def update_from_json(
path: One or more paths to files with STAC items.
chunk_size: The chunk size to load into memory at a time. Defaults to 65536.

Other args:
Keyword Args:
limit: The maximum number of JSON Items to use for schema inference
"""
for batch in read_json_chunked(path, chunk_size=chunk_size, limit=limit):
Expand Down
43 changes: 34 additions & 9 deletions stac_geoparquet/arrow/_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
from pathlib import Path
from typing import Any, Iterable
from typing import Any, Iterable, Literal

import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -19,69 +19,94 @@ def parse_stac_ndjson_to_parquet(
chunk_size: int = 65536,
schema: pa.Schema | InferredSchema | None = None,
limit: int | None = None,
schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0",
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
**kwargs: Any,
) -> None:
"""Convert one or more newline-delimited JSON STAC files to GeoParquet

Args:
input_path: One or more paths to files with STAC items.
output_path: A path to the output Parquet file.

Keyword Args:
chunk_size: The chunk size. Defaults to 65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will first be inferred via a full pass over the input data.
In this case, there will be two full passes over the input data: one to
infer a common schema across all data and another to read the data and
iteratively convert to GeoParquet.
limit: The maximum number of JSON records to convert.
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""

batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema, limit=limit
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(
create_geoparquet_metadata(pa.Table.from_batches([first_batch]))
create_geoparquet_metadata(
pa.Table.from_batches([first_batch]), schema_version=schema_version
)
)
with pq.ParquetWriter(output_path, schema, **kwargs) as writer:
writer.write_batch(first_batch)
for batch in batches_iter:
writer.write_batch(batch)


def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None:
def to_parquet(
table: pa.Table,
where: Any,
*,
schema_version: Literal["1.0.0", "1.1.0"] = "1.0.0",
**kwargs: Any,
) -> None:
"""Write an Arrow table with STAC data to GeoParquet

This writes metadata compliant with GeoParquet 1.1.

Args:
table: The table to write to Parquet
where: The destination for saving.

Keyword Args:
schema_version: GeoParquet specification version; if not provided will default
to latest supported version.
"""
metadata = table.schema.metadata or {}
metadata.update(create_geoparquet_metadata(table))
metadata.update(create_geoparquet_metadata(table, schema_version=schema_version))
table = table.replace_schema_metadata(metadata)

pq.write_table(table, where, **kwargs)


def create_geoparquet_metadata(table: pa.Table) -> dict[bytes, bytes]:
def create_geoparquet_metadata(
table: pa.Table,
*,
schema_version: Literal["1.0.0", "1.1.0"],
) -> dict[bytes, bytes]:
# TODO: include bbox of geometries
column_meta = {
"encoding": "WKB",
# TODO: specify known geometry types
"geometry_types": [],
"crs": WGS84_CRS_JSON,
"edges": "planar",
"covering": {
}

if int(schema_version.split(".")[1]) >= 1:
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
column_meta["covering"] = {
"bbox": {
"xmin": ["bbox", "xmin"],
"ymin": ["bbox", "ymin"],
"xmax": ["bbox", "xmax"],
"ymax": ["bbox", "ymax"],
}
},
}
}

geo_meta: dict[str, Any] = {
"version": "1.1.0-dev",
"version": schema_version,
"columns": {"geometry": column_meta},
"primary_column": "geometry",
}
Expand Down
Loading