Skip to content

Commit

Permalink
Dataset.as_field for nesting; release
Browse files Browse the repository at this point in the history
  • Loading branch information
liquidcarbon committed Nov 7, 2024
1 parent df2a7ef commit 90bfd62
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 58 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Have you ever stared at a bunch of numbers and had no clue what they represented
## Future

- pipification? I envision that even when ~complete, this package remain just one file
- nested data (WIP)
- nested data - WIP, but this already works:

```python
# nested datasets serialize as dicts(structs)
Expand All @@ -284,7 +284,7 @@ class User(af.Dataset):
attrs = af.VectorObject("user attributes")
class Task(af.Dataset):
created_ts = af.ScalarF64("created timestamp")
user = af.VectorObject("user")
user = User.as_field("vector")
hours = af.VectorI16("time worked (hours)")
u1 = User(name="Alice", attrs=["adorable", "agreeable"])
u2 = User(name="Brent", attrs=["bland", "broke"])
Expand Down
122 changes: 70 additions & 52 deletions affinity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__doc__ = """
Module for creating well-documented datasets, with types and annotations.
Version 0.7.0
"""

from dataclasses import dataclass, field
Expand Down Expand Up @@ -31,15 +32,15 @@ def try_import(module: str) -> object | None:
pa = try_import("pyarrow")
pq = try_import("pyarrow.parquet")

try:
# streaming to/from cloud
pass
except Exception:
pass


@dataclass
class Location:
"""Dataclass for writing the data.
Used in a special attribute `Dataset.LOCATION` that is attached to
all Datasets via metaclass by default, or can be set explicitly.
"""

folder: str | Path = field(default=Path("."))
file: str | Path = field(default="export.csv")
partition_by: List[str] = field(default_factory=list)
Expand Down Expand Up @@ -70,7 +71,8 @@ def __set__(self, instance, values):
)
except OverflowError as e:
raise e
except Exception as e: # leaving blanket exception to troubleshoot
except Exception as e:
# blanket exception to troubleshoot, thus far it never came up
raise e
if instance is None:
self._values = _values
Expand All @@ -90,7 +92,7 @@ def factory(cls, dtype, array_class=pd.Series, cls_name=None):
"""Factory method for creating typed classes.
I failed to convince IDEs that factory-made classes are not of "DescriptorType"
and reverted to explicit class declarations.
and reverted to explicit class declarations. Keeping for posterity.
"""

class DescriptorType(cls):
Expand Down Expand Up @@ -168,61 +170,34 @@ def __repr__(cls) -> str:
for k, v in cls.__dict__.items():
if isinstance(v, Descriptor):
_lines.append(f"{k}: {v.info}")
if isinstance(v, DatasetMeta):
_lines.append(f"{k}: {v.__doc__}")
return "\n".join(_lines)


class BaseDataset(metaclass=DatasetMeta):
class DatasetBase(metaclass=DatasetMeta):
"""Parent class and classmethods for main Dataset class."""

@classmethod
def as_field(cls, as_type: str | Scalar | Vector = Vector, comment: str = ""):
_comment = comment or cls.__doc__
if as_type in (Scalar, "scalar"):
return ScalarObject(_comment)
elif as_type in (Vector, "vector"):
return VectorObject(_comment)

@classmethod
def get_scalars(cls):
return {k: None for k, v in cls.__dict__.items() if isinstance(v, Scalar)}

@classmethod
def get_vectors(cls):
return {k: None for k, v in cls.__dict__.items() if isinstance(v, Vector)}
return {k: v for k, v in cls.__dict__.items() if isinstance(v, Vector)}

@classmethod
def get_dict(cls):
return dict(cls())


class Dataset(BaseDataset):
"""Base class for typed, annotated datasets."""

def __init__(self, **fields: Scalar | Vector):
"""Create dataset, dynamically setting field values.
Vectors are initialized first, ensuring all are of equal length.
Scalars are filled in afterwards.
"""

self._vectors = self.__class__.get_vectors()
self._scalars = self.__class__.get_scalars()
if len(self._vectors) == 0 and len(self._scalars) == 0:
raise ValueError("no attributes defined in your dataset")
self.origin = {"created_ts": int(time() * 1000)}
_sizes = {}
for vector_name in self._vectors:
field_data = fields.get(vector_name)
setattr(self, vector_name, field_data)
_sizes[vector_name] = len(self.__dict__[vector_name])
if len(self._vectors) > 0:
self._max_size = max(_sizes.values())
if not all([self._max_size == v for v in _sizes.values()]):
raise ValueError(f"vectors must be of equal size: {_sizes}")
else:
self._max_size = 1

for scalar_name in self._scalars:
_value = fields.get(scalar_name)
_scalar = self.__class__.__dict__[scalar_name]
_scalar.value = _value
_vector_from_scalar = Vector.from_scalar(_scalar, self._max_size)
setattr(self, scalar_name, _vector_from_scalar)
self._scalars[scalar_name] = _value

if len(self.origin) == 1: # only after direct __init__
self.origin["source"] = "manual"

@classmethod
def build(cls, query=None, dataframe=None, **kwargs):
"""Build from DuckDB query or a dataframe.
Expand Down Expand Up @@ -260,6 +235,47 @@ def from_sql(cls, query: str, **kwargs):
instance.origin["source"] += f"\nquery:\n{query}"
return instance


class Dataset(DatasetBase):
"""Base class for typed, annotated datasets."""

def __init__(self, **fields: Scalar | Vector):
"""Create dataset, dynamically setting field values.
Vectors are initialized first, ensuring all are of equal length.
Scalars are filled in afterwards.
"""
self._vectors = self.__class__.get_vectors()
self._scalars = self.__class__.get_scalars()
if len(self._vectors) + len(self._scalars) == 0:
raise ValueError("no attributes defined in your dataset")
self.origin = {"created_ts": int(time() * 1000)}
_sizes = {}
for vector_name in self._vectors:
_values = fields.get(vector_name)
setattr(self, vector_name, _values)
_sizes[vector_name] = len(self.__dict__[vector_name])
if len(self._vectors) > 0:
self._max_size = max(_sizes.values())
if not all([self._max_size == v for v in _sizes.values()]):
raise ValueError(f"vectors must be of equal size: {_sizes}")
else:
self._max_size = 1

for scalar_name in self._scalars:
_value = fields.get(scalar_name)
_scalar = self.__class__.__dict__[scalar_name]
_scalar.value = _value
_vector_from_scalar = Vector.from_scalar(_scalar, self._max_size)
setattr(self, scalar_name, _vector_from_scalar)
if isinstance(_value, Dataset):
self._scalars[scalar_name] = _value.dict
else:
self._scalars[scalar_name] = _value

if len(self.origin) == 1: # only after direct __init__
self.origin["source"] = "manual"

def __eq__(self, other):
return self.df.equals(other.df)

Expand All @@ -282,7 +298,8 @@ def __repr__(self):

@property
def shape(self):
return len(self), len(self._vectors) + len(self._scalars)
n_cols = len(self._vectors) + len(self._scalars)
return len(self), n_cols

@property
def dict(self) -> dict:
Expand Down Expand Up @@ -348,6 +365,7 @@ def sql(self, query, **replacements):
This is what **replacements kwargs are for.
By default, df=self.df (pandas dataframe) is used.
The registered views persist across queries. RAM impact TBD.
TODO: add registrations to `from_sql`
"""
if replacements.get("df") is None:
duckdb.register("df", self.df)
Expand Down Expand Up @@ -404,7 +422,7 @@ def to_parquet(self, path, engine="duckdb", **kwargs):
raise NotImplementedError
return path

def partition(self) -> Tuple[List[str], List[str], List[str], List[BaseDataset]]:
def partition(self) -> Tuple[List[str], List[str], List[str], List[DatasetBase]]:
"""Path and format constructed from `LOCATION` attribute.
Variety of outputs is helpful when populating cloud warehouses,
Expand All @@ -431,7 +449,7 @@ def partition(self) -> Tuple[List[str], List[str], List[str], List[BaseDataset]]
return names, folders, filepaths, datasets


### Typed scalars and vectors
### Typed scalars and vectors. TODO: datetimes?


class ScalarObject(Scalar):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "affinity"
version = "0.6.0"
version = "0.7.0"
description = "Module for creating well-documented datasets, with types and annotations."
authors = [
{ name = "Alex Kislukhin" }
Expand Down
56 changes: 53 additions & 3 deletions test_affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class aScalarDataset(af.Dataset):
data = aScalarDataset(v1=0, v2=float("-inf"))
assert not data.v1[-1]
assert data.v2.dtype == np.float32
assert data._scalars == dict(v1=0, v2=float("-inf"))
# assert data._scalars == dict(v1=0, v2=float("-inf"))
empty_scalar_dataset_df = aScalarDataset().df
assert empty_scalar_dataset_df.dtypes.to_list() == [np.bool_, np.float32]

Expand Down Expand Up @@ -186,6 +186,18 @@ class aDatasetOnlyVector(af.Dataset):
assert data1 == data2


def test_as_field():
class aDataset(af.Dataset):
"""Comment."""

v1 = af.VectorBool("True or False")

as_scalar_field = aDataset.as_field("scalar")
assert str(as_scalar_field) == "ScalarObject <class 'object'> # Comment."
as_vector_field = aDataset.as_field("vector")
assert "VectorObject <class 'object'> # Comment. | len 0" in str(as_vector_field)


def test_from_dataframe():
class aDataset(af.Dataset):
v1 = af.VectorBool("")
Expand Down Expand Up @@ -407,14 +419,52 @@ class bDataset(af.Dataset):
assert filepaths[0] == "s3://mybucket/affinity/v1=a/export.csv"


def test_flatten_nested_dataset():
def test_flatten_nested_scalar_datasets():
class User(af.Dataset):
"""Users."""

name = af.ScalarObject("username")
attrs = af.VectorObject("user attributes")

class Task(af.Dataset):
"""Tasks with nested Users."""

created_ts = af.VectorF64("created timestamp")
user = User.as_field(af.Scalar) # becomes af.ScalarObject("Users.")
hours = af.VectorI16("time worked (hours)")

u1 = User(name="Alice", attrs=["adorable", "agreeable"])
t1 = Task(created_ts=[0.1, 23.45], user=u1, hours=[3, 5])

expected_dict = {
"created_ts": [0.1, 23.45],
"user": {"name": "Alice", "attrs": ["adorable", "agreeable"]},
"hours": [3, 5],
}
assert t1.model_dump() == expected_dict
flattened_df = pd.DataFrame(
{
"created_ts": {0: 0.1, 1: 23.45},
"name": {0: "Alice", 1: "Alice"},
"attrs": {0: ["adorable", "agreeable"], 1: ["adorable", "agreeable"]},
"hours": {0: 3, 1: 5},
}
)
assert t1.flatten(prefix=False).to_dict() == flattened_df.to_dict()


def test_flatten_nested_vector_datasets():
class User(af.Dataset):
"""Users."""

name = af.ScalarObject("username")
attrs = af.VectorObject("user attributes")

class Task(af.Dataset):
"""Tasks with nested Users."""

created_ts = af.ScalarF64("created timestamp")
user = af.VectorObject("user")
user = User.as_field(af.Vector) # becomes af.VectorObject("Users.")
hours = af.VectorI16("time worked (hours)")

u1 = User(name="Alice", attrs=["adorable", "agreeable"])
Expand Down

0 comments on commit 90bfd62

Please sign in to comment.