From 840230cdcd5d0d3b56df8fa30839a75305e05701 Mon Sep 17 00:00:00 2001 From: Jo Kristian Bergum Date: Fri, 10 Nov 2023 11:34:10 +0100 Subject: [PATCH] Jobergum/document v1 id formatter (#626) * Refactor and implement slice delete * Improve tests * test failures * Add support for store-only and streaming mode --- .../source/examples/pyvespa-examples.ipynb | 2 +- vespa/application.py | 204 ++++++++++-------- vespa/package.py | 10 + vespa/templates/services.xml | 2 +- vespa/test_application.py | 56 +++-- vespa/test_integration_docker.py | 50 +++-- vespa/test_package.py | 37 ++++ 7 files changed, 232 insertions(+), 129 deletions(-) diff --git a/docs/sphinx/source/examples/pyvespa-examples.ipynb b/docs/sphinx/source/examples/pyvespa-examples.ipynb index 7e3aae99..a866ce4b 100644 --- a/docs/sphinx/source/examples/pyvespa-examples.ipynb +++ b/docs/sphinx/source/examples/pyvespa-examples.ipynb @@ -550,7 +550,7 @@ ], "source": [ "from vespa.io import VespaResponse\n", - "response:VespaResponse = app.get_data(data_id=0, **{\"format.tensors\": \"short-value\"})\n", + "response:VespaResponse = app.get_data(schema=\"neighbors\", data_id=0, **{\"format.tensors\": \"short-value\"})\n", "print(json.dumps(response.get_json(), indent=4))" ] }, diff --git a/vespa/application.py b/vespa/application.py index bdf7f0c2..347cb938 100644 --- a/vespa/application.py +++ b/vespa/application.py @@ -159,8 +159,7 @@ def _check_for_running_loop_and_run_coroutine(coro): future = executor.submit( Vespa._run_coroutine_new_event_loop, new_loop, coro ) - return_value = future.result() - return return_value + return future.result() except RuntimeError: return asyncio.run(coro) @@ -445,9 +444,7 @@ def delete_data( :param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP DELETE request. """ - if not namespace: - namespace = schema - + with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app: return sync_app.delete_data( schema=schema, data_id=data_id, namespace=namespace, **kwargs @@ -455,7 +452,7 @@ def delete_data( def delete_all_docs( - self, content_cluster_name: str, schema: str, namespace: str = None, **kwargs + self, content_cluster_name: str, schema: str, namespace: str = None, slices:int = 1, **kwargs ) -> Response: """ Delete all documents associated with the schema. This might block for a long time as @@ -464,39 +461,34 @@ def delete_all_docs( :param content_cluster_name: Name of content cluster to GET from, or visit. :param schema: The schema that we are deleting data from. :param namespace: The namespace that we are deleting data from. If no namespace is provided the schema is used. + :param slices: Number of slices to use for parallel delete requests. Defaults to 1. :param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP DELETE request. """ - if not namespace: - namespace = schema - with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app: + with VespaSync(self, pool_connections=slices, pool_maxsize=slices) as sync_app: return sync_app.delete_all_docs( content_cluster_name=content_cluster_name, namespace=namespace, - schema=schema, **kwargs + schema=schema, slices=slices, + **kwargs ) def get_data( - self, data_id: str, schema: Optional[str]=None, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs + self, schema:str, data_id: str, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs ) -> VespaResponse: """ Get a data point from a Vespa app. :param data_id: Unique id associated with this data point. :param schema: The schema that we are getting data from. Will attempt to infer schema name if not provided. + :param data_id: Unique id associated with this data point. :param namespace: The namespace that we are getting data from. If no namespace is provided the schema is used. :param raise_on_not_found: Raise an exception if the data_id is not found. Default is False. :param kwargs: Additional arguments to be passed to the HTTP GET request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP GET request. """ - if not schema: - schema = self._infer_schema_name() - - if not namespace: - namespace = schema - with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app: return sync_app.get_data( schema=schema, data_id=data_id, namespace=namespace, raise_on_not_found=raise_on_not_found, **kwargs @@ -522,8 +514,7 @@ def update_data( :param kwargs: Additional arguments to be passed to the HTTP PUT request. https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP PUT request. """ - if not namespace: - namespace = schema + with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app: return sync_app.update_data( @@ -569,8 +560,37 @@ def predict(self, x, model_id, function_name="output_0"): ) ) + def get_document_v1_path( + self, + id:str, + schema:Optional[str]=None, + namespace:Optional[str]=None, + group:Optional[str]=None, + number:Optional[str] = None) -> str: + """ + Convert to document v1 path + + :param id: The id of the document + :param namespace: The namespace of the document + :param schema: The schema of the document + :param group: The group of the document + :param number: The number of the document + :return: The path to the document v1 endpoint + """ + if not schema: + print("schema is not provided. Attempting to infer schema name.") + schema = self._infer_schema_name() + if not namespace: + namespace = schema + if number: + return f"/document/v1/{namespace}/{schema}/number/{number}/{id}" + if group: + return f"/document/v1/{namespace}/{schema}/group/{group}/{id}" + return f"/document/v1/{namespace}/{schema}/docid/{id}" + + class VespaSync(object): - def __init__(self, app: Vespa, pool_maxsize: int = 100, pool_connections=100) -> None: + def __init__(self, app: Vespa, pool_maxsize: int = 10, pool_connections=10) -> None: self.app = app if self.app.key: self.cert = (self.app.cert, self.app.key) @@ -662,11 +682,9 @@ def feed_data_point( :raises HTTPError: if one occurred """ - if not namespace: - namespace = schema - - end_point = "{}/document/v1/{}/{}/docid/{}".format( - self.app.end_point, namespace, schema, str(data_id) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}".format( + self.app.end_point, path ) vespa_format = {"fields": fields} response = self.http_session.post(end_point, json=vespa_format, params=kwargs) @@ -713,12 +731,12 @@ def delete_data( :return: Response of the HTTP DELETE request. :raises HTTPError: if one occurred """ - if not namespace: - namespace = schema - - end_point = "{}/document/v1/{}/{}/docid/{}".format( - self.app.end_point, namespace, schema, str(data_id) + + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}".format( + self.app.end_point, path ) + response = self.http_session.delete(end_point, params=kwargs) raise_for_status(response) return VespaResponse( @@ -729,14 +747,15 @@ def delete_data( ) def delete_all_docs( - self, content_cluster_name: str, schema: str, namespace: str = None, **kwargs - ) -> Response: + self, content_cluster_name: str, schema: str, namespace: str = None, slices:int=1,**kwargs + ) -> None: """ Delete all documents associated with the schema. :param content_cluster_name: Name of content cluster to GET from, or visit. :param schema: The schema that we are deleting data from. :param namespace: The namespace that we are deleting data from. + :param slices: Number of slices to use for parallel delete. :param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters) :return: Response of the HTTP DELETE request. :raises HTTPError: if one occurred @@ -744,26 +763,34 @@ def delete_all_docs( if not namespace: namespace = schema - end_point = "{}/document/v1/{}/{}/docid/?cluster={}&selection=true".format( - self.app.end_point, namespace, schema, content_cluster_name - ) - request_endpoint = end_point - last_response = None - while True: - try: - response = self.http_session.delete(request_endpoint, params=kwargs) - last_response = response - result = response.json() - if "continuation" in result: - request_endpoint = "{}&continuation={}".format( - end_point, result["continuation"] - ) - else: - break - except Exception as e: - print(e) - last_response = None - return last_response + def delete_slice(slice_id): + end_point = "{}/document/v1/{}/{}/docid/?cluster={}&selection=true&slices={}&sliceId={}".format( + self.app.end_point, namespace, schema, content_cluster_name, slices, slice_id + ) + request_endpoint = end_point + count = 0 + errors = 0 + while True: + try: + count += 1 + response = self.http_session.delete(request_endpoint, params=kwargs) + result = response.json() + if "continuation" in result: + request_endpoint = "{}&continuation={}".format( + end_point, result["continuation"] + ) + else: + break + except Exception as e: + errors += 1 + error_rate = errors / count + if error_rate > 0.1: + raise Exception("Too many errors for slice delete requests") from e + sleep(1) + + with ThreadPoolExecutor(max_workers=slices) as executor: + executor.map(delete_slice, range(slices)) + def get_data( self, schema: str, data_id: str, namespace: str = None, raise_on_not_found: Optional[bool]=False, **kwargs @@ -779,12 +806,11 @@ def get_data( :return: Response of the HTTP GET request. :raises HTTPError: if one occurred """ - if not namespace: - namespace = schema - - end_point = "{}/document/v1/{}/{}/docid/{}".format( - self.app.end_point, namespace, schema, str(data_id) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}".format( + self.app.end_point, path ) + response = self.http_session.get(end_point, params=kwargs) raise_for_status(response, raise_on_not_found=raise_on_not_found) return VespaResponse( @@ -815,11 +841,10 @@ def update_data( :return: Response of the HTTP PUT request. :raises HTTPError: if one occurred """ - if not namespace: - namespace = schema - - end_point = "{}/document/v1/{}/{}/docid/{}?create={}".format( - self.app.end_point, namespace, schema, str(data_id), str(create).lower() + + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}?create={}".format( + self.app.end_point, path, str(create).lower() ) vespa_format = {"fields": {k: {"assign": v} for k, v in fields.items()}} response = self.http_session.put(end_point, json=vespa_format, params=kwargs) @@ -834,7 +859,7 @@ def update_data( class VespaAsync(object): def __init__( - self, app: Vespa, connections: Optional[int] = 100, total_timeout: int = 10 + self, app: Vespa, connections: Optional[int] = 10, total_timeout: int = 180 ) -> None: self.app = app self.aiohttp_session = None @@ -888,24 +913,25 @@ async def _wait(f, args, **kwargs): @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def query( self, - body: Optional[Dict] = None, - ): - r = await self.aiohttp_session.post(self.app.search_end_point, json=body) + body: Optional[Dict] = None, + **kwargs + ) -> VespaQueryResponse: + r = await self.aiohttp_session.post(self.app.search_end_point, json=body, params=kwargs) return VespaQueryResponse( json=await r.json(), status_code=r.status, url=str(r.url) ) @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def feed_data_point( - self, schema: str, data_id: str, fields: Dict, namespace: str = None + self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs ) -> VespaResponse: - if not namespace: - namespace = schema - end_point = "{}/document/v1/{}/{}/docid/{}".format( - self.app.end_point, namespace, schema, str(data_id) + + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}".format( + self.app.end_point, path ) vespa_format = {"fields": fields} - response = await self.aiohttp_session.post(end_point, json=vespa_format) + response = await self.aiohttp_session.post(end_point, json=vespa_format, params=kwargs) return VespaResponse( json=await response.json(), status_code=response.status, @@ -915,15 +941,13 @@ async def feed_data_point( @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def delete_data( - self, schema: str, data_id: str, namespace: str = None + self, schema: str, data_id: str, namespace: str = None, **kwargs ) -> VespaResponse: - if not namespace: - namespace = schema - - end_point = "{}/document/v1/{}/{}/docid/{}".format( - self.app.end_point, namespace, schema, str(data_id) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}".format( + self.app.end_point, path ) - response = await self.aiohttp_session.delete(end_point) + response = await self.aiohttp_session.delete(end_point, params=kwargs) return VespaResponse( json=await response.json(), status_code=response.status, @@ -933,14 +957,13 @@ async def delete_data( @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def get_data( - self, schema: str, data_id: str, namespace: str = None + self, schema: str, data_id: str, namespace: str = None, **kwargs ) -> VespaResponse: - if not namespace: - namespace = schema - end_point = "{}/document/v1/{}/{}/docid/{}".format( - self.app.end_point, namespace, schema, str(data_id) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}".format( + self.app.end_point, path ) - response = await self.aiohttp_session.get(end_point) + response = await self.aiohttp_session.get(end_point, params=kwargs) return VespaResponse( json=await response.json(), status_code=response.status, @@ -956,15 +979,14 @@ async def update_data( fields: Dict, create: bool = False, namespace: str = None, + **kwargs ) -> VespaResponse: - if not namespace: - namespace = schema - - end_point = "{}/document/v1/{}/{}/docid/{}?create={}".format( - self.app.end_point, namespace, schema, str(data_id), str(create).lower() + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + end_point = "{}{}?create={}".format( + self.app.end_point, path, str(create).lower() ) vespa_format = {"fields": {k: {"assign": v} for k, v in fields.items()}} - response = await self.aiohttp_session.put(end_point, json=vespa_format) + response = await self.aiohttp_session.put(end_point, json=vespa_format, params=kwargs) return VespaResponse( json=await response.json(), status_code=response.status, diff --git a/vespa/package.py b/vespa/package.py index cec9ec03..e922e24b 100644 --- a/vespa/package.py +++ b/vespa/package.py @@ -1231,6 +1231,7 @@ def __init__( global_document: bool = False, imported_fields: Optional[List[ImportedField]] = None, document_summaries: Optional[List[DocumentSummary]] = None, + mode: Optional[str] = "index", **kwargs: Unpack[SchemaConfiguration], ) -> None: """ @@ -1247,6 +1248,7 @@ def __init__( :param global_document: Set to True to copy the documents to all content nodes. Default to False. :param imported_fields: A list of :class:`ImportedField` defining fields from global documents to be imported. :param document_summaries: A list of :class:`DocumentSummary` associated with the schema. + :param mode: Schema mode. Defaults to 'index'. Other options are 'store-only' and 'streaming'. :key stemming: The default stemming setting. Defaults to 'best'. To create a Schema: @@ -1258,6 +1260,14 @@ def __init__( self.document = document self.global_document = global_document + if mode not in ["index", "store-only", "streaming"]: + raise ValueError( + "Invalid mode: {0}. Valid options are 'index', 'store-only' and 'streaming'.".format( + mode + ) + ) + self.mode = mode + self.fieldsets = {} if fieldsets is not None: self.fieldsets = {fieldset.name: fieldset for fieldset in fieldsets} diff --git a/vespa/templates/services.xml b/vespa/templates/services.xml index 08852f24..9d274174 100644 --- a/vespa/templates/services.xml +++ b/vespa/templates/services.xml @@ -40,7 +40,7 @@ {% if schema.global_document %} {% else %} - + {% endif %} {% endfor %} diff --git a/vespa/test_application.py b/vespa/test_application.py index 7147191b..07c137c7 100644 --- a/vespa/test_application.py +++ b/vespa/test_application.py @@ -51,9 +51,9 @@ def test_delete_all_docs(self): app = Vespa(url="http://localhost", port=8080) with requests_mock.Mocker() as m: m.delete("http://localhost:8080/document/v1/foo/foo/docid/", status_code=200, text="{}") - r:VespaResponse = app.delete_all_docs( + app.delete_all_docs( schema="foo", namespace="foo", content_cluster_name="content", timeout="200s") - self.assertEqual(r.url, "http://localhost:8080/document/v1/foo/foo/docid/?cluster=content&selection=true&timeout=200s") + class TestVespa(unittest.TestCase): @@ -70,6 +70,38 @@ def test_end_point(self): self.assertEqual( Vespa(url="http://localhost:8080").end_point, "http://localhost:8080" ) + + def test_document_v1_format(self): + vespa = Vespa(url="http://localhost", port=8080) + self.assertEqual( + vespa.get_document_v1_path(id=0, schema="foo"), + "/document/v1/foo/foo/docid/0", + ) + self.assertEqual( + vespa.get_document_v1_path(id="0", schema="foo"), + "/document/v1/foo/foo/docid/0", + ) + + self.assertEqual( + vespa.get_document_v1_path(id="0", schema="foo", namespace="bar"), + "/document/v1/bar/foo/docid/0", + ) + + self.assertEqual( + vespa.get_document_v1_path(id="0", schema="foo", namespace="bar", group="g0"), + "/document/v1/bar/foo/group/g0/0", + ) + + self.assertEqual( + vespa.get_document_v1_path(id="0", schema="foo", namespace="bar", number="0"), + "/document/v1/bar/foo/number/0/0", + ) + + self.assertEqual( + vespa.get_document_v1_path(id="#1", schema="foo", namespace="bar", group="ab"), + "/document/v1/bar/foo/group/ab/#1", + ) + def test_query_token(self): self.assertEqual( @@ -98,16 +130,15 @@ def test_infer_schema(self): # No schema # app_package = ApplicationPackage(name="test") - # ToDo: re-enable this test later, maybe ... - #app = Vespa(url="http://localhost", port=8080, application_package=app_package) - #with self.assertRaisesRegex( - # ValueError, - # "Application has no schema. Not possible to infer schema name.", - #): - # _ = app._infer_schema_name() - # + + # app = Vespa(url="http://localhost", port=8080, application_package=app_package) + # with self.assertRaisesRegex( + # ValueError, + # "Application has no schema. Not possible to infer schema name.", + # ): + # _ = app._infer_schema_name() + # More than one schema - # app_package = ApplicationPackage( name="test", schema=[ @@ -121,9 +152,8 @@ def test_infer_schema(self): "Application has more than one schema. Not possible to infer schema name.", ): _ = app._infer_schema_name() - # + # One schema - # app_package = ApplicationPackage( name="test", schema=[ diff --git a/vespa/test_integration_docker.py b/vespa/test_integration_docker.py index 4305afc4..c1bee93d 100644 --- a/vespa/test_integration_docker.py +++ b/vespa/test_integration_docker.py @@ -405,12 +405,14 @@ def execute_data_operations( response = sync_app.feed_data_point( schema=schema_name, data_id=fields_to_send["id"], - fields=fields_to_send, + fields=fields_to_send, tracelevel=9 ) self.assertEqual( response.json["id"], "id:{}:{}::{}".format(schema_name, schema_name, fields_to_send["id"]), ) + self.assertTrue(response.is_successfull()) + self.assertTrue("trace" in response.json) async def execute_async_data_operations( self, @@ -432,18 +434,17 @@ async def execute_async_data_operations( There are cases where fields returned from Vespa are different from inputs, e.g. when dealing with Tensors. :return: """ - async with app.asyncio(connections=120, total_timeout=50) as async_app: + async with app.asyncio(connections=12, total_timeout=50) as async_app: # - # Get data that does not exist + # Get data that does not exist and test additional request params # response = await async_app.get_data( - schema=schema_name, data_id=fields_to_send[0]["id"] + schema=schema_name, data_id=fields_to_send[0]["id"], tracelevel=9 ) self.assertEqual(response.status_code, 404) + self.assertTrue("trace" in response.json) - # # Feed some data points - # feed = [] for fields in fields_to_send: feed.append( @@ -452,6 +453,7 @@ async def execute_async_data_operations( schema=schema_name, data_id=fields["id"], fields=fields, + timeout=10 ) ) ) @@ -477,13 +479,12 @@ async def execute_async_data_operations( ), ) - # # Get data that exists - # response = await async_app.get_data( - schema=schema_name, data_id=fields_to_send[0]["id"] + schema=schema_name, data_id=fields_to_send[0]["id"], timeout=180 ) self.assertEqual(response.status_code, 200) + self.assertEqual(response.is_successfull(), True) result = response.json self.assertDictEqual( result, @@ -498,14 +499,15 @@ async def execute_async_data_operations( }, ) # - # Update data + # date data # response = await async_app.update_data( schema=schema_name, data_id=field_to_update["id"], - fields=field_to_update, + fields=field_to_update, tracelevel=9 ) result = response.json + self.assertTrue("trace" in result) self.assertEqual( result["id"], "id:{}:{}::{}".format(schema_name, schema_name, field_to_update["id"]), @@ -536,43 +538,41 @@ async def execute_async_data_operations( ), }, ) - # + # Delete a data point - # response = await async_app.delete_data( - schema=schema_name, data_id=fields_to_send[0]["id"] + schema=schema_name, data_id=fields_to_send[0]["id"], tracelevel=9 ) result = response.json + self.assertTrue("trace" in result) self.assertEqual( result["id"], "id:{}:{}::{}".format( schema_name, schema_name, fields_to_send[0]["id"] ), ) - # + # Deleted data should be gone - # response = await async_app.get_data( - schema=schema_name, data_id=fields_to_send[0]["id"] + schema=schema_name, data_id=fields_to_send[0]["id"], tracelevel=9 ) self.assertEqual(response.status_code, 404) - # + self.assertTrue("trace" in response.json) + + # Issue a bunch of queries in parallel - # queries = [] for i in range(10): queries.append( asyncio.create_task( async_app.query( + yql="select * from sources * where true", body={ - "yql": 'select * from sources * where (userInput("sddocname:{}"))'.format( - schema_name - ), "ranking": { "profile": "default", "listFeatures": "false", }, - "timeout": 5000, + "timeout": 5, } ) ) @@ -581,6 +581,9 @@ async def execute_async_data_operations( self.assertEqual( queries[0].result().number_documents_indexed, len(fields_to_send) - 1 ) + for query in queries: + self.assertEqual(query.result().status_code, 200) + self.assertEqual(query.result().is_successfull(), True) def get_model_endpoints_when_no_model_is_available( @@ -712,6 +715,7 @@ def test_execute_async_data_operations(self): ) ) def tearDown(self) -> None: + self.app.delete_all_docs(content_cluster_name="content_msmarco", schema=self.app_package.name) self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT) self.vespa_docker.container.remove() diff --git a/vespa/test_package.py b/vespa/test_package.py index 93fdd18b..cdb158a3 100644 --- a/vespa/test_package.py +++ b/vespa/test_package.py @@ -721,6 +721,43 @@ def test_query_profile_type_to_text(self): ) self.assertEqual(self.app_package.query_profile_type_to_text, expected_result) +class TestApplicationPackageStreaming(unittest.TestCase): + def setUp(self) -> None: + self.mail = Schema( + name="mail", + mode="streaming", + document=Document( + fields=[ + Field( + name="title", type="string", indexing=["attribute", "summary"] + ) + ]) + ) + self.app_package = ApplicationPackage( + name="testapp", + schema=[self.mail]) + + def test_generated_services_uses_mode_streaming(self): + expected_result = ( + '\n' + '\n' + ' \n' + " \n" + " \n" + " \n" + ' \n' + ' 1\n' + " \n" + ' \n' + " \n" + " \n" + ' \n' + " \n" + " \n" + "" + ) + self.assertEqual(self.app_package.services_to_text, expected_result) + class TestApplicationPackageMultipleSchema(unittest.TestCase): def setUp(self) -> None: