Skip to content

Commit

Permalink
Revert "chore(batch-exports): revert async session in async function (#…
Browse files Browse the repository at this point in the history
…25581)"

This reverts commit e565f54.
  • Loading branch information
EDsCODE authored Oct 15, 2024
1 parent e565f54 commit ca685d6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 33 deletions.
72 changes: 47 additions & 25 deletions posthog/temporal/common/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import datetime as dt
import json
import ssl
import typing
import uuid

Expand Down Expand Up @@ -76,6 +77,13 @@ def encode_clickhouse_data(data: typing.Any, quote_char="'") -> bytes:
return f"{quote_char}{str_data}{quote_char}".encode()


class ClickHouseClientNotConnected(Exception):
"""Exception raised when attempting to run an async query without connecting."""

def __init__(self):
super().__init__("ClickHouseClient is not connected. Are you running in a context manager?")


class ClickHouseError(Exception):
"""Base Exception representing anything going wrong with ClickHouse."""

Expand All @@ -97,21 +105,21 @@ class ClickHouseClient:

def __init__(
self,
session: aiohttp.ClientSession | None = None,
url: str = "http://localhost:8123",
user: str = "default",
password: str = "",
database: str = "default",
timeout: None | aiohttp.ClientTimeout = None,
ssl: ssl.SSLContext | bool = True,
**kwargs,
):
if session is None:
self.session = aiohttp.ClientSession()
else:
self.session = session

self.url = url
self.headers = {}
self.params = {}
self.timeout = timeout
self.ssl = ssl
self.connector: None | aiohttp.TCPConnector = None
self.session: None | aiohttp.ClientSession = None

if user:
self.headers["X-ClickHouse-User"] = user
Expand All @@ -123,10 +131,9 @@ def __init__(
self.params.update(kwargs)

@classmethod
def from_posthog_settings(cls, session, settings, **kwargs):
def from_posthog_settings(cls, settings, **kwargs):
"""Initialize a ClickHouseClient from PostHog settings."""
return cls(
session=session,
url=settings.CLICKHOUSE_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
Expand All @@ -140,6 +147,9 @@ async def is_alive(self) -> bool:
Returns:
A boolean indicating whether the connection is alive.
"""
if self.session is None:
raise ClickHouseClientNotConnected()

try:
await self.session.get(
url=self.url,
Expand Down Expand Up @@ -217,6 +227,8 @@ async def aget_query(
Returns:
The response received from the ClickHouse HTTP interface.
"""
if self.session is None:
raise ClickHouseClientNotConnected()

params = {**self.params}
if query_id is not None:
Expand Down Expand Up @@ -245,6 +257,8 @@ async def apost_query(
Returns:
The response received from the ClickHouse HTTP interface.
"""
if self.session is None:
raise ClickHouseClientNotConnected()

params = {**self.params}
if query_id is not None:
Expand Down Expand Up @@ -378,11 +392,21 @@ async def astream_query_as_arrow(

async def __aenter__(self):
"""Enter method part of the AsyncContextManager protocol."""
self.connector = aiohttp.TCPConnector(ssl=self.ssl)
self.session = aiohttp.ClientSession(connector=self.connector, timeout=self.timeout)
return self

async def __aexit__(self, exc_type, exc_value, tb):
"""Exit method part of the AsyncContextManager protocol."""
await self.session.close()
if self.session is not None:
await self.session.close()

if self.connector is not None:
await self.connector.close()

self.session = None
self.connector = None
return False


@contextlib.asynccontextmanager
Expand Down Expand Up @@ -427,19 +451,17 @@ async def get_client(
team_id, settings.CLICKHOUSE_MAX_BLOCK_SIZE_DEFAULT
)

with aiohttp.TCPConnector(ssl=False) as connector:
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
async with ClickHouseClient(
session,
url=settings.CLICKHOUSE_OFFLINE_HTTP_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
database=settings.CLICKHOUSE_DATABASE,
max_execution_time=settings.CLICKHOUSE_MAX_EXECUTION_TIME,
max_memory_usage=settings.CLICKHOUSE_MAX_MEMORY_USAGE,
max_block_size=max_block_size,
cancel_http_readonly_queries_on_client_close=1,
output_format_arrow_string_as_string="true",
**kwargs,
) as client:
yield client
async with ClickHouseClient(
url=settings.CLICKHOUSE_OFFLINE_HTTP_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
database=settings.CLICKHOUSE_DATABASE,
timeout=timeout,
ssl=False,
max_execution_time=settings.CLICKHOUSE_MAX_EXECUTION_TIME,
max_memory_usage=settings.CLICKHOUSE_MAX_MEMORY_USAGE,
max_block_size=max_block_size,
output_format_arrow_string_as_string="true",
**kwargs,
) as client:
yield client
15 changes: 7 additions & 8 deletions posthog/temporal/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio
import random

import psycopg
import pytest
import pytest_asyncio
import temporalio.worker
from asgiref.sync import sync_to_async
from django.conf import settings
from temporalio.testing import ActivityEnvironment
import psycopg
from psycopg import sql
from temporalio.testing import ActivityEnvironment

from posthog.models import Organization, Team
from posthog.temporal.common.clickhouse import ClickHouseClient
Expand Down Expand Up @@ -65,10 +65,10 @@ def activity_environment():
return ActivityEnvironment()


@pytest.fixture(scope="module")
def clickhouse_client():
@pytest_asyncio.fixture(scope="module")
async def clickhouse_client():
"""Provide a ClickHouseClient to use in tests."""
client = ClickHouseClient(
async with ClickHouseClient(
url=settings.CLICKHOUSE_HTTP_URL,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
Expand All @@ -78,9 +78,8 @@ def clickhouse_client():
# Durting testing, it's useful to enable it to wait for mutations.
# Otherwise, tests that rely on running a mutation may become flaky.
mutations_sync=2,
)

yield client
) as client:
yield client


@pytest_asyncio.fixture
Expand Down

0 comments on commit ca685d6

Please sign in to comment.