Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch-exports): Create async session in async function #25579

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions posthog/temporal/common/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import datetime as dt
import json
import ssl
import typing
import uuid

Expand Down Expand Up @@ -76,6 +77,13 @@ def encode_clickhouse_data(data: typing.Any, quote_char="'") -> bytes:
return f"{quote_char}{str_data}{quote_char}".encode()


class ClickHouseClientNotConnected(Exception):
"""Exception raised when attempting to run an async query without connecting."""

def __init__(self):
super().__init__("ClickHouseClient is not connected. Are you running in a context manager?")


class ClickHouseError(Exception):
"""Base Exception representing anything going wrong with ClickHouse."""

Expand All @@ -97,21 +105,21 @@ class ClickHouseClient:

def __init__(
self,
session: aiohttp.ClientSession | None = None,
url: str = "http://localhost:8123",
user: str = "default",
password: str = "",
database: str = "default",
timeout: None | aiohttp.ClientTimeout = None,
ssl: ssl.SSLContext | bool = True,
**kwargs,
):
if session is None:
self.session = aiohttp.ClientSession()
else:
self.session = session

self.url = url
self.headers = {}
self.params = {}
self.timeout = timeout
self.ssl = ssl
self.connector: None | aiohttp.TCPConnector = None
self.session: None | aiohttp.ClientSession = None

if user:
self.headers["X-ClickHouse-User"] = user
Expand All @@ -123,10 +131,9 @@ def __init__(
self.params.update(kwargs)

@classmethod
def from_posthog_settings(cls, session, settings, **kwargs):
def from_posthog_settings(cls, settings, **kwargs):
"""Initialize a ClickHouseClient from PostHog settings."""
return cls(
session=session,
url=settings.CLICKHOUSE_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
Expand All @@ -140,6 +147,9 @@ async def is_alive(self) -> bool:
Returns:
A boolean indicating whether the connection is alive.
"""
if self.session is None:
raise ClickHouseClientNotConnected()

try:
await self.session.get(
url=self.url,
Expand Down Expand Up @@ -217,6 +227,8 @@ async def aget_query(
Returns:
The response received from the ClickHouse HTTP interface.
"""
if self.session is None:
raise ClickHouseClientNotConnected()

params = {**self.params}
if query_id is not None:
Expand Down Expand Up @@ -245,6 +257,8 @@ async def apost_query(
Returns:
The response received from the ClickHouse HTTP interface.
"""
if self.session is None:
raise ClickHouseClientNotConnected()

params = {**self.params}
if query_id is not None:
Expand Down Expand Up @@ -378,11 +392,21 @@ async def astream_query_as_arrow(

async def __aenter__(self):
"""Enter method part of the AsyncContextManager protocol."""
self.connector = aiohttp.TCPConnector(ssl=self.ssl)
self.session = aiohttp.ClientSession(connector=self.connector, timeout=self.timeout)
return self

async def __aexit__(self, exc_type, exc_value, tb):
"""Exit method part of the AsyncContextManager protocol."""
await self.session.close()
if self.session is not None:
await self.session.close()

if self.connector is not None:
await self.connector.close()

self.session = None
self.connector = None
return False


@contextlib.asynccontextmanager
Expand Down Expand Up @@ -427,19 +451,17 @@ async def get_client(
team_id, settings.CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT
)

with aiohttp.TCPConnector(ssl=False) as connector:
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
async with ClickHouseClient(
session,
url=settings.CLICKHOUSE_OFFLINE_HTTP_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
database=settings.CLICKHOUSE_DATABASE,
max_execution_time=settings.CLICKHOUSE_MAX_EXECUTION_TIME,
max_memory_usage=settings.CLICKHOUSE_MAX_MEMORY_USAGE,
max_block_size=max_block_size,
cancel_http_readonly_queries_on_client_close=1,
output_format_arrow_string_as_string="true",
**kwargs,
) as client:
yield client
async with ClickHouseClient(
url=settings.CLICKHOUSE_OFFLINE_HTTP_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
database=settings.CLICKHOUSE_DATABASE,
timeout=timeout,
ssl=False,
max_execution_time=settings.CLICKHOUSE_MAX_EXECUTION_TIME,
max_memory_usage=settings.CLICKHOUSE_MAX_MEMORY_USAGE,
max_block_size=max_block_size,
output_format_arrow_string_as_string="true",
**kwargs,
) as client:
yield client
15 changes: 7 additions & 8 deletions posthog/temporal/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio
import random

import psycopg
import pytest
import pytest_asyncio
import temporalio.worker
from asgiref.sync import sync_to_async
from django.conf import settings
from temporalio.testing import ActivityEnvironment
import psycopg
from psycopg import sql
from temporalio.testing import ActivityEnvironment

from posthog.models import Organization, Team
from posthog.temporal.common.clickhouse import ClickHouseClient
Expand Down Expand Up @@ -65,10 +65,10 @@ def activity_environment():
return ActivityEnvironment()


@pytest.fixture(scope="module")
def clickhouse_client():
@pytest_asyncio.fixture(scope="module")
async def clickhouse_client():
"""Provide a ClickHouseClient to use in tests."""
client = ClickHouseClient(
async with ClickHouseClient(
url=settings.CLICKHOUSE_HTTP_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
Expand All @@ -78,9 +78,8 @@ def clickhouse_client():
# Durting testing, it's useful to enable it to wait for mutations.
# Otherwise, tests that rely on running a mutation may become flaky.
mutations_sync=2,
)

yield client
) as client:
yield client


@pytest_asyncio.fixture
Expand Down
Loading