Skip to content

Commit

Permalink
Parquet Writer Implementation (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd authored Sep 6, 2022
1 parent 966ca46 commit 0c2aec8
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 47 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pandas tableauhyperapi
python -m pip install pandas tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -59,7 +59,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pandas tableauhyperapi
python -m pip install pandas tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -85,7 +85,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pandas tableauhyperapi
python -m pip install pandas tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -106,7 +106,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install "pandas<1.3.0" tableauhyperapi
python -m pip install "pandas<1.3.0" tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -128,7 +128,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install "pandas<1.3.0" tableauhyperapi
python -m pip install "pandas<1.3.0" tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -149,7 +149,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install "pandas<1.3.0" tableauhyperapi
python -m pip install "pandas<1.3.0" tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -170,7 +170,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pandas tableauhyperapi
python -m pip install pandas tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -191,7 +191,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pandas tableauhyperapi
python -m pip install pandas tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand All @@ -213,7 +213,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pandas tableauhyperapi
python -m pip install pandas tableauhyperapi pyarrow
- name: Build extensions
run: |
python setup.py build_ext --inplace
Expand Down
3 changes: 2 additions & 1 deletion doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ API Reference
:param database: Name / location of the Hyper file to write to.
:param table: Table to write to. Must be supplied as a keyword argument.
:param hyper_process: A `HyperProcess` in case you want to spawn it by yourself. Optional. Must be supplied as a keyword argument.
:param use_parquet: Use a temporary parquet file to write into the Hyper database, which typically will yield better performance. Boolean, default False


.. py:function:: frame_from_hyper(source: Union[str, pathlib.Path, tab_api.Connection], *, table: Union[str, tableauhyperapi.Name, tableauhyperapi.TableName], hyper_process: Optional[HyperProcess], use_float_na: bool = False) -> pd.DataFrame:
Expand All @@ -29,7 +30,7 @@ API Reference
:param dict_of_frames: A dictionary whose keys are valid table identifiers and values are dataframes
:param database: Name / location of the Hyper file to write to.
:param hyper_process: A `HyperProcess` in case you want to spawn it by yourself. Optional. Must be supplied as a keyword argument.

:param use_parquet: Use a temporary parquet file to write into the Hyper database, which typically will yield better performance. Boolean, default False

.. py:function:: frames_from_hyper(source: Union[str, pathlib.Path, tab_api.Connection], *, hyper_process: Optional[HyperProcess]) -> Dict[tableauhyperapi.TableName, pd.DataFrame, use_float_na: bool = False]:
Expand Down
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
name: pantab-dev
channels:
- defaults
- conda-forge
dependencies:
- black
- flake8
- isort
- mypy
- pandas
- pip
- pyarrow
- python
- pytest
- sphinx
Expand Down
84 changes: 67 additions & 17 deletions pantab/_writer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
import os
import pathlib
import shutil
import tempfile
Expand Down Expand Up @@ -110,6 +111,7 @@ def _insert_frame(
connection: tab_api.Connection,
table: pantab_types.TableType,
table_mode: str,
use_parquet: bool,
) -> None:
_validate_table_mode(table_mode)

Expand Down Expand Up @@ -145,22 +147,58 @@ def _insert_frame(

connection.catalog.create_table_if_not_exists(table_def)

null_mask = np.ascontiguousarray(pd.isnull(df))
# Special handling for conversions
df, dtypes = _maybe_convert_timedelta(df)

with tab_api.Inserter(connection, table_def) as inserter:
if compat.PANDAS_130:
libpantab.write_to_hyper(df, null_mask, inserter._buffer, dtypes)
else:
libpantab.write_to_hyper_legacy(
df.itertuples(index=False, name=None),
null_mask,
inserter._buffer,
df.shape[1],
dtypes,
if not use_parquet:
null_mask = np.ascontiguousarray(pd.isnull(df))
# Special handling for conversions
df, dtypes = _maybe_convert_timedelta(df)

with tab_api.Inserter(connection, table_def) as inserter:
if compat.PANDAS_130:
libpantab.write_to_hyper(df, null_mask, inserter._buffer, dtypes)
else:
libpantab.write_to_hyper_legacy(
df.itertuples(index=False, name=None),
null_mask,
inserter._buffer,
df.shape[1],
dtypes,
)
inserter.execute()
else:
if any(x.name == "timedelta64[ns]" for x in df.dtypes):
raise ValueError(
"Writing timedelta values with use_parquet=True is not yet supported."
)
inserter.execute()

import pyarrow as pa
import pyarrow.parquet as pq

tbl = pa.Table.from_pandas(df)
non_nullable = {"int16", "int32", "int64", "bool"}
new_fields = []
for field, dtype in zip(tbl.schema, df.dtypes):
if dtype.name in non_nullable:
new_fields.append(
pa.field(name=field.name, type=field.type, nullable=False)
)
else:
new_fields.append(field)

new_schema = pa.schema(new_fields)
tbl = tbl.cast(new_schema)

# Windows can't read and write a NamedTemporaryFile in one pass
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
pq.write_table(tbl, tmp)

connection.execute_command(
f"COPY {table} FROM '{tmp.name}' WITH (FORMAT 'parquet')"
)

try:
os.unlink(tmp.name)
except FileNotFoundError:
pass


def frame_to_hyper(
Expand All @@ -170,9 +208,16 @@ def frame_to_hyper(
table: pantab_types.TableType,
table_mode: str = "w",
hyper_process: Optional[tab_api.HyperProcess] = None,
use_parquet: bool = False,
) -> None:
"""See api.rst for documentation"""
frames_to_hyper({table: df}, database, table_mode, hyper_process=hyper_process)
frames_to_hyper(
{table: df},
database,
table_mode,
hyper_process=hyper_process,
use_parquet=use_parquet,
)


def frames_to_hyper(
Expand All @@ -181,6 +226,7 @@ def frames_to_hyper(
table_mode: str = "w",
*,
hyper_process: Optional[tab_api.HyperProcess] = None,
use_parquet: bool = False,
) -> None:
"""See api.rst for documentation."""
_validate_table_mode(table_mode)
Expand All @@ -196,7 +242,11 @@ def frames_to_hyper(
) as connection:
for table, df in dict_of_frames.items():
_insert_frame(
df, connection=connection, table=table, table_mode=table_mode
df,
connection=connection,
table=table,
table_mode=table_mode,
use_parquet=use_parquet,
)

# In Python 3.9+ we can just pass the path object, but due to bpo 32689
Expand Down
6 changes: 6 additions & 0 deletions pantab/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,9 @@ def table_name(request):
def datapath():
"""Location of data files in test folder."""
return pathlib.Path(__file__).parent / "data"


@pytest.fixture(params=[False, True])
def use_parquet(request):
"""Whether to use parquet for intermediate file storage."""
return request.param
74 changes: 56 additions & 18 deletions pantab/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,74 +8,106 @@
import pantab


def test_bad_table_mode_raises(df, tmp_hyper):
def test_bad_table_mode_raises(df, tmp_hyper, use_parquet):
if use_parquet:
df = df.drop(columns=["timedelta64"])

msg = "'table_mode' must be either 'w' or 'a'"
with pytest.raises(ValueError, match=msg):
pantab.frame_to_hyper(df, tmp_hyper, table="test", table_mode="x")
pantab.frame_to_hyper(
df, tmp_hyper, table="test", table_mode="x", use_parquet=use_parquet
)

with pytest.raises(ValueError, match=msg):
pantab.frames_to_hyper({"a": df}, tmp_hyper, table_mode="x")


def test_append_mode_raises_column_mismatch(df, tmp_hyper, table_name):
pantab.frame_to_hyper(df, tmp_hyper, table=table_name)
def test_append_mode_raises_column_mismatch(df, tmp_hyper, table_name, use_parquet):
if use_parquet:
df = df.drop(columns=["timedelta64"])

pantab.frame_to_hyper(df, tmp_hyper, table=table_name, use_parquet=use_parquet)

df = df.drop("object", axis=1)
msg = "^Mismatched column definitions:"
with pytest.raises(TypeError, match=msg):
pantab.frame_to_hyper(df, tmp_hyper, table=table_name, table_mode="a")
pantab.frame_to_hyper(
df, tmp_hyper, table=table_name, table_mode="a", use_parquet=use_parquet
)


def test_append_mode_raises_column_dtype_mismatch(df, tmp_hyper, table_name):
pantab.frame_to_hyper(df, tmp_hyper, table=table_name)
def test_append_mode_raises_column_dtype_mismatch(
df, tmp_hyper, table_name, use_parquet
):
if use_parquet:
df = df.drop(columns=["timedelta64"])

pantab.frame_to_hyper(df, tmp_hyper, table=table_name, use_parquet=use_parquet)

df["int16"] = df["int16"].astype(np.int64)
msg = "^Mismatched column definitions:"
with pytest.raises(TypeError, match=msg):
pantab.frame_to_hyper(df, tmp_hyper, table=table_name, table_mode="a")
pantab.frame_to_hyper(
df, tmp_hyper, table=table_name, table_mode="a", use_parquet=use_parquet
)


def test_failed_write_doesnt_overwrite_file(
df, tmp_hyper, monkeypatch, table_mode, use_parquet
):
if use_parquet:
df = df.drop(columns=["timedelta64"])

def test_failed_write_doesnt_overwrite_file(df, tmp_hyper, monkeypatch, table_mode):
pantab.frame_to_hyper(df, tmp_hyper, table="test", table_mode=table_mode)
pantab.frame_to_hyper(
df, tmp_hyper, table="test", table_mode=table_mode, use_parquet=use_parquet
)
last_modified = tmp_hyper.stat().st_mtime

# Let's patch the Inserter to fail on creation
def failure(*args, **kwargs):
raise ValueError("dummy failure")

monkeypatch.setattr(pantab._writer.tab_api, "Inserter", failure, raising=True)
if use_parquet:
pytest.skip("TODO: should figure out patching here")
# monkeypatch.setattr(pantab._writer.pq, "write_table", failure, raising=True)
else:
monkeypatch.setattr(pantab._writer.tab_api, "Inserter", failure, raising=True)

# Try out our write methods
with pytest.raises(ValueError, match="dummy failure"):
pantab.frame_to_hyper(df, tmp_hyper, table="test", table_mode=table_mode)
pantab.frames_to_hyper({"test": df}, tmp_hyper, table_mode=table_mode)
pantab.frame_to_hyper(
df, tmp_hyper, table="test", table_mode=table_mode, use_parquet=use_parquet
)
pantab.frames_to_hyper(
{"test": df}, tmp_hyper, table_mode=table_mode, use_parquet=use_parquet
)

# Neither should not update file stats
assert last_modified == tmp_hyper.stat().st_mtime


def test_duplicate_columns_raises(tmp_hyper):
def test_duplicate_columns_raises(tmp_hyper, use_parquet):
df = pd.DataFrame([[1, 1]], columns=[1, 1])
with pytest.raises(
tab_api.hyperexception.HyperException,
match="column '1' specified more than once",
):
pantab.frame_to_hyper(df, tmp_hyper, table="foo")
pantab.frame_to_hyper(df, tmp_hyper, table="foo", use_parquet=use_parquet)

with pytest.raises(
tab_api.hyperexception.HyperException,
match="column '1' specified more than once",
):
pantab.frames_to_hyper({"test": df}, tmp_hyper)
pantab.frames_to_hyper({"test": df}, tmp_hyper, use_parquet=use_parquet)


@pytest.mark.parametrize("dtype", ["UInt64", "datetime64[ns, US/Eastern]"])
def test_unsupported_dtype_raises(dtype, tmp_hyper):
def test_unsupported_dtype_raises(dtype, tmp_hyper, use_parquet):
df = pd.DataFrame([[1]], dtype=dtype)

msg = re.escape(f"Conversion of '{dtype}' dtypes not supported!")
with pytest.raises(TypeError, match=msg):
pantab.frame_to_hyper(df, tmp_hyper, table="test")
pantab.frame_to_hyper(df, tmp_hyper, table="test", use_parquet=use_parquet)


def test_bad_value_gives_clear_message(tmp_hyper):
Expand All @@ -85,3 +117,9 @@ def test_bad_value_gives_clear_message(tmp_hyper):

with pytest.raises(TypeError, match=msg):
pantab.frame_to_hyper(df, tmp_hyper, table="test")


def test_use_parquet_with_timedelta_raises(df, tmp_hyper):
msg = "Writing timedelta values with use_parquet=True is not yet supported."
with pytest.raises(ValueError, match=msg):
pantab.frame_to_hyper(df, tmp_hyper, table="test", use_parquet=True)
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[mypy]

[mypy-numpy,pandas.*,pytest,setuptools,tableauhyperapi.*]
[mypy-numpy,pandas.*,pytest,setuptools,tableauhyperapi.*,pyarrow.*]
ignore_missing_imports=True

[flake8]
Expand Down

0 comments on commit 0c2aec8

Please sign in to comment.