Skip to content

Commit

Permalink
Remove unnecessary changes to reduce PR length
Browse files Browse the repository at this point in the history
  • Loading branch information
gwaramadze committed Oct 31, 2024
1 parent 7a1da71 commit f3e0393
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
8 changes: 3 additions & 5 deletions quixstreams/sinks/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from .batch import SinkBatch
from .exceptions import SinkBackpressureError
from .item import SinkItem
from .manager import SinkManager
from .sink import BaseSink, BatchingSink

__all__ = (
"BaseSink",
"BatchingSink",
"SinkBackpressureError",
"SinkBatch",
"SinkItem",
"SinkBackpressureError",
"SinkManager",
"BatchingSink",
"BaseSink",
)
8 changes: 4 additions & 4 deletions quixstreams/sinks/base/sink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import logging
from typing import Any
from typing import Any, Dict, List, Tuple

from quixstreams.models import HeaderValue
from quixstreams.sinks.base.batch import SinkBatch
Expand Down Expand Up @@ -35,7 +35,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
headers: List[Tuple[str, HeaderValue]],
topic: str,
partition: int,
offset: int,
Expand Down Expand Up @@ -71,7 +71,7 @@ class BatchingSink(BaseSink):
batching sink.
"""

_batches: dict[tuple[str, int], SinkBatch]
_batches: Dict[Tuple[str, int], SinkBatch]

def __init__(self):
self._batches = {}
Expand All @@ -95,7 +95,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
headers: List[Tuple[str, HeaderValue]],
topic: str,
partition: int,
offset: int,
Expand Down
9 changes: 5 additions & 4 deletions quixstreams/sources/base/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import signal
import threading
from pickle import PicklingError
from typing import List

from quixstreams.logging import LOGGER_NAME, configure_logging
from quixstreams.models import Topic
Expand All @@ -27,7 +28,7 @@ def __init__(self, source):
super().__init__()
self.source: BaseSource = source

self._exceptions: list[Exception] = []
self._exceptions: List[Exception] = []
self._started = False
self._stopping = False

Expand Down Expand Up @@ -167,7 +168,7 @@ class SourceManager:
"""

def __init__(self):
self.processes: list[SourceProcess] = []
self.processes: List[SourceProcess] = []

def register(self, source: BaseSource):
"""
Expand All @@ -187,11 +188,11 @@ def register(self, source: BaseSource):
return process

@property
def sources(self) -> list[BaseSource]:
def sources(self) -> List[BaseSource]:
return [process.source for process in self.processes]

@property
def topics(self) -> list[Topic]:
def topics(self) -> List[Topic]:
return [process.source.producer_topic for process in self.processes]

def start_sources(self) -> None:
Expand Down

0 comments on commit f3e0393

Please sign in to comment.