From afcba0c90ec08e2e5c4bd378547a9f5f313fb61d Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 25 Oct 2024 17:51:33 +0100 Subject: [PATCH] Use a seperate thread pool executor for warehouse pipeline --- posthog/temporal/data_imports/pipelines/pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 45aeb1860b29c..f0d9961f4e665 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import Literal from uuid import UUID @@ -253,7 +254,10 @@ def _run(self) -> dict[str, int]: async def run(self) -> dict[str, int]: try: - return await asyncio.to_thread(self._run) + # Use a dedicated thread pool to not interfere with the heartbeater thread + with ThreadPoolExecutor(max_workers=5) as pipeline_executor: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(pipeline_executor, self._run) except PipelineStepFailed as e: self.logger.exception(f"Data import failed for endpoint with exception {e}", exc_info=e) raise