Skip to content

Commit

Permalink
Build path using output_dir/topic/partition/key/offset.extension[.gz]…
Browse files Browse the repository at this point in the history
… pattern
  • Loading branch information
gwaramadze committed Oct 30, 2024
1 parent 143e4c8 commit 57149b8
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions quixstreams/sinks/community/file/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, output_dir: str, format: Union[FormatName, Format]) -> None:
"""
super().__init__()
self._format = self._resolve_format(format)
self._output_dir = Path(output_dir)
self._output_dir = output_dir
logger.info(f"Files will be written to '{self._output_dir}'.")

def write(self, batch: SinkBatch) -> None:
Expand All @@ -73,17 +73,19 @@ def write(self, batch: SinkBatch) -> None:
messages_by_key[message.key].append(message)

_to_str = bytes.decode if isinstance(message.key, bytes) else str
path = [batch.topic, str(batch.partition)]

for key, messages in messages_by_key.items():
# Serialize messages for this key using the specified format
data = self._format.serialize(messages)

# Generate filename based on the key
safe_key = _UNSAFE_CHARACTERS_REGEX.sub("_", _to_str(key))

directory = self._output_dir / safe_key
# Generate directory based on topic / partition / key
directory = Path(self._output_dir)
for part in [*path, _to_str(key)]:
directory /= _UNSAFE_CHARACTERS_REGEX.sub("_", part)
directory.mkdir(parents=True, exist_ok=True)

# Generate filename based on the message offset
padded_offset = str(messages[0].offset).zfill(15)
file_path = directory / (padded_offset + self._format.file_extension)

Expand Down

0 comments on commit 57149b8

Please sign in to comment.