Skip to content

Commit

Permalink
Merge pull request #553 from vespa-engine/lesters/handle-timeouts-and…
Browse files Browse the repository at this point in the history
…-resets-better

Better handling of timeout and resets and increase timeout to align with vespa
  • Loading branch information
lesters authored Aug 29, 2023
2 parents 1a27ea4 + becbe10 commit b569ac8
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

import sys
import ssl
import json
import aiohttp
import asyncio
import requests
import traceback
import concurrent.futures
import json
from collections import Counter
from typing import Any, Optional, Dict, List, IO

import requests
from pandas import DataFrame
from requests import Session
from requests.models import Response
from requests.exceptions import ConnectionError, HTTPError, JSONDecodeError
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from tenacity import retry, wait_exponential, stop_after_attempt
from tenacity import retry, wait_exponential, stop_after_attempt, RetryError
from time import sleep

from vespa.exceptions import VespaError
Expand Down Expand Up @@ -369,7 +370,7 @@ def _feed_batch(
schema: Optional[str] = None,
asynchronous=True,
connections: Optional[int] = 100,
total_timeout: int = 100,
total_timeout: int = 240,
namespace: Optional[str] = None,
):
"""
Expand Down Expand Up @@ -415,7 +416,7 @@ def feed_batch(
schema: Optional[str] = None,
asynchronous=True,
connections: Optional[int] = 100,
total_timeout: int = 100,
total_timeout: int = 240,
namespace: Optional[str] = None,
batch_size=1000,
output: bool = True,
Expand Down Expand Up @@ -1118,9 +1119,15 @@ async def _feed_data_point_semaphore(
namespace = schema

async with semaphore:
return await self.feed_data_point(
schema=schema, data_id=data_id, fields=fields, namespace=namespace
)
try:
return await self.feed_data_point(
schema=schema, data_id=data_id, fields=fields, namespace=namespace
)
except RetryError as e:
print("Unable to feed data point after retries. Giving up. Cause:", file=sys.stderr)
if e.__cause__:
traceback.print_tb(e.__cause__.__traceback__)
e.reraise()

async def feed_batch(self, schema: str, batch: List[Dict], namespace=None):
if not namespace:
Expand Down

0 comments on commit b569ac8

Please sign in to comment.