Skip to content

Commit

Permalink
feat(ingest/tableau): add retries on OSErrors (datahub-project#9596)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 11, 2024
1 parent f4d7778 commit 5476b52
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
37 changes: 31 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,15 @@ def get_connection_object_page(
count: int = 0,
offset: int = 0,
retry_on_auth_error: bool = True,
retries_remaining: Optional[int] = None,
) -> Tuple[dict, int, int]:
retries_remaining = retries_remaining or self.config.max_retries

logger.debug(
f"Query {connection_type} to get {count} objects with offset {offset}"
)
try:
assert self.server is not None
query_data = query_metadata(
self.server, query, connection_type, count, offset, query_filter
)
Expand All @@ -766,7 +770,31 @@ def get_connection_object_page(
# will be thrown and we need to re-authenticate and retry.
self._authenticate()
return self.get_connection_object_page(
query, connection_type, query_filter, count, offset, False
query,
connection_type,
query_filter,
count,
offset,
retry_on_auth_error=False,
retries_remaining=retries_remaining,
)
except OSError:
# In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04),
# the request logic was changed to use threads.
# https://github.com/tableau/server-client-python/commit/307d8a20a30f32c1ce615cca7c6a78b9b9bff081
# I'm not exactly sure why, but since then, we now occasionally see
# `OSError: Response is not a http response?` for some requests. This
# retry logic is basically a bandaid for that.
if retries_remaining <= 0:
raise
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
)

if c.ERRORS in query_data:
Expand All @@ -781,11 +809,7 @@ def get_connection_object_page(
else:
raise RuntimeError(f"Query {connection_type} error: {errors}")

connection_object = (
query_data.get(c.DATA).get(connection_type, {})
if query_data.get(c.DATA)
else {}
)
connection_object = query_data.get(c.DATA, {}).get(connection_type, {})

total_count = connection_object.get(c.TOTAL_COUNT, 0)
has_next_page = connection_object.get(c.PAGE_INFO, {}).get(
Expand Down Expand Up @@ -2520,3 +2544,4 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> TableauSourceReport:
return self.report
return self.report
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, List, Optional, Tuple

from pydantic.fields import Field
from tableauserverclient import Server

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
Expand Down Expand Up @@ -699,7 +700,6 @@ def get_overridden_info(
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> Tuple[Optional[str], Optional[str], str, str]:

original_platform = platform = get_platform(connection_type)
if (
lineage_overrides is not None
Expand Down Expand Up @@ -824,7 +824,14 @@ def clean_query(query: str) -> str:
return query


def query_metadata(server, main_query, connection_name, first, offset, qry_filter=""):
def query_metadata(
server: Server,
main_query: str,
connection_name: str,
first: int,
offset: int,
qry_filter: str = "",
) -> dict:
query = """{{
{connection_name} (first:{first}, offset:{offset}, filter:{{{filter}}})
{{
Expand Down

0 comments on commit 5476b52

Please sign in to comment.