Skip to content

Commit

Permalink
Jobergum/document v1 id formatter (#626)
Browse files Browse the repository at this point in the history
* Refactor and implement slice delete

* Improve tests

* test failures

* Add support for store-only and streaming mode
  • Loading branch information
jobergum authored Nov 10, 2023
1 parent c6054b4 commit 840230c
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 129 deletions.
2 changes: 1 addition & 1 deletion docs/sphinx/source/examples/pyvespa-examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@
],
"source": [
"from vespa.io import VespaResponse\n",
"response:VespaResponse = app.get_data(data_id=0, **{\"format.tensors\": \"short-value\"})\n",
"response:VespaResponse = app.get_data(schema=\"neighbors\", data_id=0, **{\"format.tensors\": \"short-value\"})\n",
"print(json.dumps(response.get_json(), indent=4))"
]
},
Expand Down
204 changes: 113 additions & 91 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ def _check_for_running_loop_and_run_coroutine(coro):
future = executor.submit(
Vespa._run_coroutine_new_event_loop, new_loop, coro
)
return_value = future.result()
return return_value
return future.result()
except RuntimeError:
return asyncio.run(coro)

Expand Down Expand Up @@ -445,17 +444,15 @@ def delete_data(
:param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP DELETE request.
"""
if not namespace:
namespace = schema


with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app:
return sync_app.delete_data(
schema=schema, data_id=data_id, namespace=namespace, **kwargs
)


def delete_all_docs(
self, content_cluster_name: str, schema: str, namespace: str = None, **kwargs
self, content_cluster_name: str, schema: str, namespace: str = None, slices:int = 1, **kwargs
) -> Response:
"""
Delete all documents associated with the schema. This might block for a long time as
Expand All @@ -464,39 +461,34 @@ def delete_all_docs(
:param content_cluster_name: Name of content cluster to GET from, or visit.
:param schema: The schema that we are deleting data from.
:param namespace: The namespace that we are deleting data from. If no namespace is provided the schema is used.
:param slices: Number of slices to use for parallel delete requests. Defaults to 1.
:param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP DELETE request.
"""
if not namespace:
namespace = schema

with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app:
with VespaSync(self, pool_connections=slices, pool_maxsize=slices) as sync_app:
return sync_app.delete_all_docs(
content_cluster_name=content_cluster_name,
namespace=namespace,
schema=schema, **kwargs
schema=schema, slices=slices,
**kwargs
)

def get_data(
self, data_id: str, schema: Optional[str]=None, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs
self, schema:str, data_id: str, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs
) -> VespaResponse:
"""
Get a data point from a Vespa app.
:param data_id: Unique id associated with this data point.
:param schema: The schema that we are getting data from. Will attempt to infer schema name if not provided.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are getting data from. If no namespace is provided the schema is used.
:param raise_on_not_found: Raise an exception if the data_id is not found. Default is False.
:param kwargs: Additional arguments to be passed to the HTTP GET request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP GET request.
"""

if not schema:
schema = self._infer_schema_name()

if not namespace:
namespace = schema

with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app:
return sync_app.get_data(
schema=schema, data_id=data_id, namespace=namespace, raise_on_not_found=raise_on_not_found, **kwargs
Expand All @@ -522,8 +514,7 @@ def update_data(
:param kwargs: Additional arguments to be passed to the HTTP PUT request. https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP PUT request.
"""
if not namespace:
namespace = schema


with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app:
return sync_app.update_data(
Expand Down Expand Up @@ -569,8 +560,37 @@ def predict(self, x, model_id, function_name="output_0"):
)
)

def get_document_v1_path(
self,
id:str,
schema:Optional[str]=None,
namespace:Optional[str]=None,
group:Optional[str]=None,
number:Optional[str] = None) -> str:
"""
Convert to document v1 path
:param id: The id of the document
:param namespace: The namespace of the document
:param schema: The schema of the document
:param group: The group of the document
:param number: The number of the document
:return: The path to the document v1 endpoint
"""
if not schema:
print("schema is not provided. Attempting to infer schema name.")
schema = self._infer_schema_name()
if not namespace:
namespace = schema
if number:
return f"/document/v1/{namespace}/{schema}/number/{number}/{id}"
if group:
return f"/document/v1/{namespace}/{schema}/group/{group}/{id}"
return f"/document/v1/{namespace}/{schema}/docid/{id}"


class VespaSync(object):
def __init__(self, app: Vespa, pool_maxsize: int = 100, pool_connections=100) -> None:
def __init__(self, app: Vespa, pool_maxsize: int = 10, pool_connections=10) -> None:
self.app = app
if self.app.key:
self.cert = (self.app.cert, self.app.key)
Expand Down Expand Up @@ -662,11 +682,9 @@ def feed_data_point(
:raises HTTPError: if one occurred
"""

if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/{}".format(
self.app.end_point, namespace, schema, str(data_id)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}".format(
self.app.end_point, path
)
vespa_format = {"fields": fields}
response = self.http_session.post(end_point, json=vespa_format, params=kwargs)
Expand Down Expand Up @@ -713,12 +731,12 @@ def delete_data(
:return: Response of the HTTP DELETE request.
:raises HTTPError: if one occurred
"""
if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/{}".format(
self.app.end_point, namespace, schema, str(data_id)

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}".format(
self.app.end_point, path
)

response = self.http_session.delete(end_point, params=kwargs)
raise_for_status(response)
return VespaResponse(
Expand All @@ -729,41 +747,50 @@ def delete_data(
)

def delete_all_docs(
self, content_cluster_name: str, schema: str, namespace: str = None, **kwargs
) -> Response:
self, content_cluster_name: str, schema: str, namespace: str = None, slices:int=1,**kwargs
) -> None:
"""
Delete all documents associated with the schema.
:param content_cluster_name: Name of content cluster to GET from, or visit.
:param schema: The schema that we are deleting data from.
:param namespace: The namespace that we are deleting data from.
:param slices: Number of slices to use for parallel delete.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP DELETE request.
:raises HTTPError: if one occurred
"""
if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/?cluster={}&selection=true".format(
self.app.end_point, namespace, schema, content_cluster_name
)
request_endpoint = end_point
last_response = None
while True:
try:
response = self.http_session.delete(request_endpoint, params=kwargs)
last_response = response
result = response.json()
if "continuation" in result:
request_endpoint = "{}&continuation={}".format(
end_point, result["continuation"]
)
else:
break
except Exception as e:
print(e)
last_response = None
return last_response
def delete_slice(slice_id):
end_point = "{}/document/v1/{}/{}/docid/?cluster={}&selection=true&slices={}&sliceId={}".format(
self.app.end_point, namespace, schema, content_cluster_name, slices, slice_id
)
request_endpoint = end_point
count = 0
errors = 0
while True:
try:
count += 1
response = self.http_session.delete(request_endpoint, params=kwargs)
result = response.json()
if "continuation" in result:
request_endpoint = "{}&continuation={}".format(
end_point, result["continuation"]
)
else:
break
except Exception as e:
errors += 1
error_rate = errors / count
if error_rate > 0.1:
raise Exception("Too many errors for slice delete requests") from e
sleep(1)

with ThreadPoolExecutor(max_workers=slices) as executor:
executor.map(delete_slice, range(slices))


def get_data(
self, schema: str, data_id: str, namespace: str = None, raise_on_not_found: Optional[bool]=False, **kwargs
Expand All @@ -779,12 +806,11 @@ def get_data(
:return: Response of the HTTP GET request.
:raises HTTPError: if one occurred
"""
if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/{}".format(
self.app.end_point, namespace, schema, str(data_id)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}".format(
self.app.end_point, path
)

response = self.http_session.get(end_point, params=kwargs)
raise_for_status(response, raise_on_not_found=raise_on_not_found)
return VespaResponse(
Expand Down Expand Up @@ -815,11 +841,10 @@ def update_data(
:return: Response of the HTTP PUT request.
:raises HTTPError: if one occurred
"""
if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/{}?create={}".format(
self.app.end_point, namespace, schema, str(data_id), str(create).lower()

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}?create={}".format(
self.app.end_point, path, str(create).lower()
)
vespa_format = {"fields": {k: {"assign": v} for k, v in fields.items()}}
response = self.http_session.put(end_point, json=vespa_format, params=kwargs)
Expand All @@ -834,7 +859,7 @@ def update_data(

class VespaAsync(object):
def __init__(
self, app: Vespa, connections: Optional[int] = 100, total_timeout: int = 10
self, app: Vespa, connections: Optional[int] = 10, total_timeout: int = 180
) -> None:
self.app = app
self.aiohttp_session = None
Expand Down Expand Up @@ -888,24 +913,25 @@ async def _wait(f, args, **kwargs):
@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def query(
self,
body: Optional[Dict] = None,
):
r = await self.aiohttp_session.post(self.app.search_end_point, json=body)
body: Optional[Dict] = None,
**kwargs
) -> VespaQueryResponse:
r = await self.aiohttp_session.post(self.app.search_end_point, json=body, params=kwargs)
return VespaQueryResponse(
json=await r.json(), status_code=r.status, url=str(r.url)
)

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def feed_data_point(
self, schema: str, data_id: str, fields: Dict, namespace: str = None
self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
) -> VespaResponse:
if not namespace:
namespace = schema
end_point = "{}/document/v1/{}/{}/docid/{}".format(
self.app.end_point, namespace, schema, str(data_id)

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}".format(
self.app.end_point, path
)
vespa_format = {"fields": fields}
response = await self.aiohttp_session.post(end_point, json=vespa_format)
response = await self.aiohttp_session.post(end_point, json=vespa_format, params=kwargs)
return VespaResponse(
json=await response.json(),
status_code=response.status,
Expand All @@ -915,15 +941,13 @@ async def feed_data_point(

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def delete_data(
self, schema: str, data_id: str, namespace: str = None
self, schema: str, data_id: str, namespace: str = None, **kwargs
) -> VespaResponse:
if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/{}".format(
self.app.end_point, namespace, schema, str(data_id)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}".format(
self.app.end_point, path
)
response = await self.aiohttp_session.delete(end_point)
response = await self.aiohttp_session.delete(end_point, params=kwargs)
return VespaResponse(
json=await response.json(),
status_code=response.status,
Expand All @@ -933,14 +957,13 @@ async def delete_data(

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def get_data(
self, schema: str, data_id: str, namespace: str = None
self, schema: str, data_id: str, namespace: str = None, **kwargs
) -> VespaResponse:
if not namespace:
namespace = schema
end_point = "{}/document/v1/{}/{}/docid/{}".format(
self.app.end_point, namespace, schema, str(data_id)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}".format(
self.app.end_point, path
)
response = await self.aiohttp_session.get(end_point)
response = await self.aiohttp_session.get(end_point, params=kwargs)
return VespaResponse(
json=await response.json(),
status_code=response.status,
Expand All @@ -956,15 +979,14 @@ async def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
**kwargs
) -> VespaResponse:
if not namespace:
namespace = schema

end_point = "{}/document/v1/{}/{}/docid/{}?create={}".format(
self.app.end_point, namespace, schema, str(data_id), str(create).lower()
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
end_point = "{}{}?create={}".format(
self.app.end_point, path, str(create).lower()
)
vespa_format = {"fields": {k: {"assign": v} for k, v in fields.items()}}
response = await self.aiohttp_session.put(end_point, json=vespa_format)
response = await self.aiohttp_session.put(end_point, json=vespa_format, params=kwargs)
return VespaResponse(
json=await response.json(),
status_code=response.status,
Expand Down
Loading

0 comments on commit 840230c

Please sign in to comment.