Skip to content

Commit

Permalink
Rename BatchFormat to just Format
Browse files Browse the repository at this point in the history
  • Loading branch information
gwaramadze committed Oct 29, 2024
1 parent d48e537 commit 3ec881a
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 26 deletions.
32 changes: 16 additions & 16 deletions quixstreams/sinks/community/file/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,33 @@

from quixstreams.sinks import BatchingSink, SinkBatch

from .formats import BatchFormat, BytesFormat, JSONFormat, ParquetFormat
from .formats import BytesFormat, Format, JSONFormat, ParquetFormat

__all__ = [
"BatchFormat",
"BytesFormat",
"FileSink",
"InvalidFormatterError",
"InvalidFormatError",
"JSONFormat",
"ParquetFormat",
]

logger = logging.getLogger(__name__)

Format = Literal["bytes", "json", "parquet"]
FormatName = Literal["bytes", "json", "parquet"]

_FORMATTERS: dict[Format, BatchFormat] = {
"json": JSONFormat(),
_FORMATS: dict[FormatName, Format] = {
"bytes": BytesFormat(),
"json": JSONFormat(),
"parquet": ParquetFormat(),
}

_UNSAFE_CHARACTERS_REGEX = re.compile(r"[^a-zA-Z0-9 ._]")


class InvalidFormatterError(Exception):
class InvalidFormatError(Exception):
"""
Raised when formatter is specified incorrectly
Raised when format is specified incorrectly
"""


Expand All @@ -43,7 +43,7 @@ class FileSink(BatchingSink):
same key are appended to the same file where possible.
"""

def __init__(self, output_dir: str, format: Format) -> None:
def __init__(self, output_dir: str, format: Union[FormatName, Format]) -> None:
"""
Initializes the FileSink with the specified configuration.
Expand Down Expand Up @@ -90,15 +90,15 @@ def write(self, batch: SinkBatch) -> None:

logger.info(f"Wrote {len(messages)} records to file '{file_path}'.")

def _resolve_format(self, formatter: Union[Format, BatchFormat]) -> BatchFormat:
if isinstance(formatter, BatchFormat):
return formatter
elif formatter_obj := _FORMATTERS.get(formatter):
return formatter_obj
def _resolve_format(self, format: Union[FormatName, Format]) -> Format:
if isinstance(format, Format):
return format
elif format_obj := _FORMATS.get(format):
return format_obj

allowed_formats = ", ".join(Format.__args__)
raise InvalidFormatterError(
f'Invalid format name "{formatter}". '
raise InvalidFormatError(
f'Invalid format name "{format}". '
f"Allowed values: {allowed_formats}, "
f"or an instance of {BatchFormat.__class__.__name__}."
f"or an instance of {Format.__class__.__name__}."
)
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/file/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .base import BatchFormat
from .base import Format
from .bytes import BytesFormat
from .json import JSONFormat
from .parquet import ParquetFormat

__all__ = ["BatchFormat", "BytesFormat", "JSONFormat", "ParquetFormat"]
__all__ = ["BytesFormat", "Format", "JSONFormat", "ParquetFormat"]
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/file/formats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from quixstreams.sinks.base import SinkItem

__all__ = ["BatchFormat"]
__all__ = ["Format"]


# TODO: Document the compatible topic formats for each formatter
# TODO: Check the types of the values before serializing


class BatchFormat(ABC):
class Format(ABC):
"""
Base class to format batches for File Sink.
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/file/formats/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

from quixstreams.sinks.base import SinkItem

from .base import BatchFormat
from .base import Format

__all__ = ["BytesFormat"]


class BytesFormat(BatchFormat):
class BytesFormat(Format):
"""
Bypass formatter to serialize
"""
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/file/formats/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from quixstreams.sinks.base import SinkItem

from .base import BatchFormat
from .base import Format

__all__ = ["JSONFormat"]


class JSONFormat(BatchFormat):
class JSONFormat(Format):
"""
Serializes messages into JSON Lines format with optional gzip compression.
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/file/formats/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from quixstreams.sinks.base import SinkItem

from .base import BatchFormat
from .base import Format

__all__ = ["ParquetFormat"]


class ParquetFormat(BatchFormat):
class ParquetFormat(Format):
# TODO: Docs
def __init__(
self,
Expand Down

0 comments on commit 3ec881a

Please sign in to comment.