Skip to content

Commit

Permalink
Better handling of timeout and resets and increase timeout to align w…
Browse files Browse the repository at this point in the history
…ith Vespa
  • Loading branch information
Lester Solbakken committed Aug 28, 2023
1 parent 1a27ea4 commit becbe10
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

import sys
import ssl
import json
import aiohttp
import asyncio
import requests
import traceback
import concurrent.futures
import json
from collections import Counter
from typing import Any, Optional, Dict, List, IO

import requests
from pandas import DataFrame
from requests import Session
from requests.models import Response
from requests.exceptions import ConnectionError, HTTPError, JSONDecodeError
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from tenacity import retry, wait_exponential, stop_after_attempt
from tenacity import retry, wait_exponential, stop_after_attempt, RetryError
from time import sleep

from vespa.exceptions import VespaError
Expand Down Expand Up @@ -369,7 +370,7 @@ def _feed_batch(
schema: Optional[str] = None,
asynchronous=True,
connections: Optional[int] = 100,
total_timeout: int = 100,
total_timeout: int = 240,
namespace: Optional[str] = None,
):
"""
Expand Down Expand Up @@ -415,7 +416,7 @@ def feed_batch(
schema: Optional[str] = None,
asynchronous=True,
connections: Optional[int] = 100,
total_timeout: int = 100,
total_timeout: int = 240,
namespace: Optional[str] = None,
batch_size=1000,
output: bool = True,
Expand Down Expand Up @@ -1118,9 +1119,15 @@ async def _feed_data_point_semaphore(
namespace = schema

async with semaphore:
return await self.feed_data_point(
schema=schema, data_id=data_id, fields=fields, namespace=namespace
)
try:
return await self.feed_data_point(
schema=schema, data_id=data_id, fields=fields, namespace=namespace
)
except RetryError as e:
print("Unable to feed data point after retries. Giving up. Cause:", file=sys.stderr)
if e.__cause__:
traceback.print_tb(e.__cause__.__traceback__)
e.reraise()

async def feed_batch(self, schema: str, batch: List[Dict], namespace=None):
if not namespace:
Expand Down

0 comments on commit becbe10

Please sign in to comment.