From f3e039367fc598cf91bdab58aab80331c1962270 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Thu, 31 Oct 2024 14:32:10 +0100 Subject: [PATCH] Remove unnecessary changes to reduce PR length --- quixstreams/sinks/base/__init__.py | 8 +++----- quixstreams/sinks/base/sink.py | 8 ++++---- quixstreams/sources/base/manager.py | 9 +++++---- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/quixstreams/sinks/base/__init__.py b/quixstreams/sinks/base/__init__.py index 5cddebee8..d63e19a0a 100644 --- a/quixstreams/sinks/base/__init__.py +++ b/quixstreams/sinks/base/__init__.py @@ -1,14 +1,12 @@ from .batch import SinkBatch from .exceptions import SinkBackpressureError -from .item import SinkItem from .manager import SinkManager from .sink import BaseSink, BatchingSink __all__ = ( - "BaseSink", - "BatchingSink", - "SinkBackpressureError", "SinkBatch", - "SinkItem", + "SinkBackpressureError", "SinkManager", + "BatchingSink", + "BaseSink", ) diff --git a/quixstreams/sinks/base/sink.py b/quixstreams/sinks/base/sink.py index da58ec516..2f052d359 100644 --- a/quixstreams/sinks/base/sink.py +++ b/quixstreams/sinks/base/sink.py @@ -1,6 +1,6 @@ import abc import logging -from typing import Any +from typing import Any, Dict, List, Tuple from quixstreams.models import HeaderValue from quixstreams.sinks.base.batch import SinkBatch @@ -35,7 +35,7 @@ def add( value: Any, key: Any, timestamp: int, - headers: list[tuple[str, HeaderValue]], + headers: List[Tuple[str, HeaderValue]], topic: str, partition: int, offset: int, @@ -71,7 +71,7 @@ class BatchingSink(BaseSink): batching sink. """ - _batches: dict[tuple[str, int], SinkBatch] + _batches: Dict[Tuple[str, int], SinkBatch] def __init__(self): self._batches = {} @@ -95,7 +95,7 @@ def add( value: Any, key: Any, timestamp: int, - headers: list[tuple[str, HeaderValue]], + headers: List[Tuple[str, HeaderValue]], topic: str, partition: int, offset: int, diff --git a/quixstreams/sources/base/manager.py b/quixstreams/sources/base/manager.py index ac87f6132..ba5c994ad 100644 --- a/quixstreams/sources/base/manager.py +++ b/quixstreams/sources/base/manager.py @@ -2,6 +2,7 @@ import signal import threading from pickle import PicklingError +from typing import List from quixstreams.logging import LOGGER_NAME, configure_logging from quixstreams.models import Topic @@ -27,7 +28,7 @@ def __init__(self, source): super().__init__() self.source: BaseSource = source - self._exceptions: list[Exception] = [] + self._exceptions: List[Exception] = [] self._started = False self._stopping = False @@ -167,7 +168,7 @@ class SourceManager: """ def __init__(self): - self.processes: list[SourceProcess] = [] + self.processes: List[SourceProcess] = [] def register(self, source: BaseSource): """ @@ -187,11 +188,11 @@ def register(self, source: BaseSource): return process @property - def sources(self) -> list[BaseSource]: + def sources(self) -> List[BaseSource]: return [process.source for process in self.processes] @property - def topics(self) -> list[Topic]: + def topics(self) -> List[Topic]: return [process.source.producer_topic for process in self.processes] def start_sources(self) -> None: