From dd409f5ee8b56756f5718fcfd8f0e2fab610320a Mon Sep 17 00:00:00 2001 From: Jo Kristian Bergum Date: Sun, 12 Nov 2023 09:59:32 +0100 Subject: [PATCH] Jobergum/streaming mode (#629) * support streaming mode with embedders * Add support for streaming mode * wrong import * too much autocomplete --- vespa/application.py | 79 +++++++++++++++-------- vespa/templates/services.xml | 6 ++ vespa/test_integration_docker.py | 105 +++++++++++++++++++++++++++++++ vespa/test_package.py | 32 +++++++++- 4 files changed, 195 insertions(+), 27 deletions(-) diff --git a/vespa/application.py b/vespa/application.py index 347cb938..81fc5322 100644 --- a/vespa/application.py +++ b/vespa/application.py @@ -264,7 +264,7 @@ def get_model_endpoint(self, model_id: Optional[str] = None) -> Optional[Respons def query( self, - body: Optional[Dict] = None, **kwargs + body: Optional[Dict] = None, groupname:str=None, **kwargs ) -> VespaQueryResponse: """ Send a query request to the Vespa application. @@ -272,18 +272,19 @@ def query( Send 'body' containing all the request parameters. :param body: Dict containing request parameters. + :param groupname: The groupname used with streaming search. param kwargs: Extra Vespa Query API parameters. :return: The response from the Vespa application. """ #Use one connection as this is a single query with VespaSync(self,pool_maxsize=1, pool_connections=1) as sync_app: return sync_app.query( - body=body, **kwargs + body=body, groupname=groupname, **kwargs ) def feed_data_point( - self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs + self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str = None, **kwargs ) -> VespaResponse: """ Feed a data point to a Vespa app. Will create a new VespaSync with @@ -294,6 +295,7 @@ def feed_data_point( :param data_id: Unique id associated with this data point. :param fields: Dict containing all the fields required by the `schema`. :param namespace: The namespace that we are sending data to. + :param groupname: The groupname that we are sending data :return: VespaResponse of the HTTP POST request. """ if not namespace: @@ -302,7 +304,7 @@ def feed_data_point( # single data point with VespaSync(app=self, pool_connections=1,pool_maxsize=1) as sync_app: return sync_app.feed_data_point( - schema=schema, data_id=data_id, fields=fields, namespace=namespace, **kwargs + schema=schema, data_id=data_id, fields=fields, namespace=namespace, groupname=groupname, **kwargs ) def feed_iterable(self, @@ -394,15 +396,21 @@ def _submit(doc:dict, sync_session:VespaSync) -> Tuple[str, Union[VespaResponse, return id, VespaResponse(status_code=499, json={"id":id, "message":"Missing fields in input dict"}, url="n/a", operation_type=operation_type) + groupname = doc.get("groupname", None) try: if operation_type == "feed": - response:VespaResponse = sync_session.feed_data_point(schema=schema, namespace=namespace, data_id=id, fields=fields, **kwargs) + response:VespaResponse = sync_session.feed_data_point( + schema=schema, namespace=namespace, + groupname=groupname, data_id=id, fields=fields, **kwargs) return (id, response) elif operation_type == "update": - response:VespaResponse = sync_session.update_data(schema=schema, namespace=namespace, data_id=id, fields=fields, **kwargs) + response:VespaResponse = sync_session.update_data( + schema=schema, namespace=namespace, + groupname=groupname, data_id=id, fields=fields, **kwargs) return (id, response) elif operation_type == "delete": - response:VespaResponse = sync_session.delete_data(schema=schema, namespace=namespace, data_id=id, **kwargs) + response:VespaResponse = sync_session.delete_data( + schema=schema, namespace=namespace, data_id=id, groupname=groupname, **kwargs) return (id, response) except Exception as e: return (id, e) @@ -433,7 +441,7 @@ def _handle_result_callback(future:Future, callback:Callable): consumer_thread.join() def delete_data( - self, schema: str, data_id: str, namespace: str = None, **kwargs + self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs ) -> VespaResponse: """ Delete a data point from a Vespa app. @@ -441,13 +449,14 @@ def delete_data( :param schema: The schema that we are deleting data from. :param data_id: Unique id associated with this data point. :param namespace: The namespace that we are deleting data from. If no namespace is provided the schema is used. + :param groupname: The groupname that we are deleting data from. :param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP DELETE request. """ with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app: return sync_app.delete_data( - schema=schema, data_id=data_id, namespace=namespace, **kwargs + schema=schema, data_id=data_id, namespace=namespace, groupname=groupname, **kwargs ) @@ -475,7 +484,7 @@ def delete_all_docs( ) def get_data( - self, schema:str, data_id: str, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs + self, schema:str, data_id: str, namespace: str = None, groupname:str=None, raise_on_not_found:Optional[bool]=False, **kwargs ) -> VespaResponse: """ Get a data point from a Vespa app. @@ -484,6 +493,7 @@ def get_data( :param schema: The schema that we are getting data from. Will attempt to infer schema name if not provided. :param data_id: Unique id associated with this data point. :param namespace: The namespace that we are getting data from. If no namespace is provided the schema is used. + :param groupname: The groupname that we are getting data from. :param raise_on_not_found: Raise an exception if the data_id is not found. Default is False. :param kwargs: Additional arguments to be passed to the HTTP GET request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP GET request. @@ -491,7 +501,8 @@ def get_data( with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app: return sync_app.get_data( - schema=schema, data_id=data_id, namespace=namespace, raise_on_not_found=raise_on_not_found, **kwargs + schema=schema, data_id=data_id, namespace=namespace, groupname=groupname, + raise_on_not_found=raise_on_not_found, **kwargs ) def update_data( @@ -501,6 +512,7 @@ def update_data( fields: Dict, create: bool = False, namespace: str = None, + groupname:str=None, **kwargs ) -> VespaResponse: """ @@ -511,6 +523,7 @@ def update_data( :param fields: Dict containing all the fields you want to update. :param create: If true, updates to non-existent documents will create an empty document to update :param namespace: The namespace that we are updating data. If no namespace is provided the schema is used. + :param groupname: The groupname that we are updating data. :param kwargs: Additional arguments to be passed to the HTTP PUT request. https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters :return: Response of the HTTP PUT request. """ @@ -523,6 +536,7 @@ def update_data( fields=fields, create=create, namespace=namespace, + groupname=groupname, **kwargs ) @@ -668,7 +682,7 @@ def predict(self, model_id, function_name, encoded_tokens): return response def feed_data_point( - self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs + self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str=None, **kwargs ) -> VespaResponse: """ Feed a data point to a Vespa app. @@ -677,12 +691,13 @@ def feed_data_point( :param data_id: Unique id associated with this data point. :param fields: Dict containing all the fields required by the `schema`. :param namespace: The namespace that we are sending data to. If no namespace is provided the schema is used. + :param groupname: The group that we are sending data to. :param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters) :return: Response of the HTTP POST request. :raises HTTPError: if one occurred """ - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}".format( self.app.end_point, path ) @@ -699,6 +714,7 @@ def feed_data_point( def query( self, body: Optional[Dict] = None, + groupname:str=None, **kwargs ) -> VespaQueryResponse: """ @@ -707,10 +723,14 @@ def query( Send 'body' containing all the request parameters. :param body: Dict containing all the request parameters. + :param groupname: The groupname used in streaming search :param kwargs: Additional Valid Vespa HTTP Query Api parameters (https://docs.vespa.ai/en/reference/query-api-reference.html) :return: Either the request body if debug_request is True or the result from the Vespa application :raises HTTPError: if one occurred """ + + if groupname: + kwargs["streaming.groupname"] = groupname response = self.http_session.post(self.app.search_end_point, json=body, params=kwargs) raise_for_status(response) return VespaQueryResponse( @@ -718,7 +738,7 @@ def query( ) def delete_data( - self, schema: str, data_id: str, namespace: str = None, + self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs ) -> VespaResponse: """ @@ -732,11 +752,10 @@ def delete_data( :raises HTTPError: if one occurred """ - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}".format( self.app.end_point, path ) - response = self.http_session.delete(end_point, params=kwargs) raise_for_status(response) return VespaResponse( @@ -793,7 +812,8 @@ def delete_slice(slice_id): def get_data( - self, schema: str, data_id: str, namespace: str = None, raise_on_not_found: Optional[bool]=False, **kwargs + self, schema: str, data_id: str, namespace: str = None, groupname:str = None, + raise_on_not_found: Optional[bool]=False, **kwargs ) -> VespaResponse: """ Get a data point from a Vespa app. @@ -801,12 +821,13 @@ def get_data( :param schema: The schema that we are getting data from. :param data_id: Unique id associated with this data point. :param namespace: The namespace that we are getting data from. + :param groupname: The groupname used to get data :param raise_on_not_found: Raise an exception if the document is not found. :param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters) :return: Response of the HTTP GET request. :raises HTTPError: if one occurred """ - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}".format( self.app.end_point, path ) @@ -827,6 +848,7 @@ def update_data( fields: Dict, create: bool = False, namespace: str = None, + groupname:str = None, **kwargs ) -> VespaResponse: """ @@ -837,12 +859,13 @@ def update_data( :param fields: Dict containing all the fields you want to update. :param create: If true, updates to non-existent documents will create an empty document to update :param namespace: The namespace that we are updating data. + :param groupname: The groupname used to update data :param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters) :return: Response of the HTTP PUT request. :raises HTTPError: if one occurred """ - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}?create={}".format( self.app.end_point, path, str(create).lower() ) @@ -914,8 +937,11 @@ async def _wait(f, args, **kwargs): async def query( self, body: Optional[Dict] = None, + groupname:str = None, **kwargs ) -> VespaQueryResponse: + if groupname: + kwargs["streaming.groupname"] = groupname r = await self.aiohttp_session.post(self.app.search_end_point, json=body, params=kwargs) return VespaQueryResponse( json=await r.json(), status_code=r.status, url=str(r.url) @@ -923,10 +949,10 @@ async def query( @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def feed_data_point( - self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs + self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str=None, **kwargs ) -> VespaResponse: - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}".format( self.app.end_point, path ) @@ -941,9 +967,9 @@ async def feed_data_point( @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def delete_data( - self, schema: str, data_id: str, namespace: str = None, **kwargs + self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs ) -> VespaResponse: - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}".format( self.app.end_point, path ) @@ -957,9 +983,9 @@ async def delete_data( @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3)) async def get_data( - self, schema: str, data_id: str, namespace: str = None, **kwargs + self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs ) -> VespaResponse: - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}".format( self.app.end_point, path ) @@ -979,9 +1005,10 @@ async def update_data( fields: Dict, create: bool = False, namespace: str = None, + groupname:str=None, **kwargs ) -> VespaResponse: - path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace) + path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname) end_point = "{}{}?create={}".format( self.app.end_point, path, str(create).lower() ) diff --git a/vespa/templates/services.xml b/vespa/templates/services.xml index 9d274174..d94de2d0 100644 --- a/vespa/templates/services.xml +++ b/vespa/templates/services.xml @@ -11,6 +11,7 @@ {% if schemas %} + {% endif %} {% if components %} {% for component in components %} @@ -36,13 +37,18 @@ 1 + {% set streaming_modes = namespace(total = 0)%} {% for schema in schemas %} {% if schema.global_document %} {% else %} {% endif %} + {% if schema.mode == "streaming" %}{% set streaming_modes.total = 1 + streaming_modes.total %}{% endif %} {% endfor %} + {% if streaming_modes.total > 0 %} + + {% endif %} diff --git a/vespa/test_integration_docker.py b/vespa/test_integration_docker.py index c1bee93d..27124f3b 100644 --- a/vespa/test_integration_docker.py +++ b/vespa/test_integration_docker.py @@ -808,3 +808,108 @@ def test_execute_async_data_operations(self): def tearDown(self) -> None: self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT) self.vespa_docker.container.remove() + +class TestStreamingApplication(unittest.TestCase): + def setUp(self) -> None: + document = Document( + fields=[ + Field(name="id", type="string", indexing=["attribute", "summary"]), + Field( + name="title", + type="string", + indexing=["index", "summary"], + index="enable-bm25", + ), + Field( + name="body", + type="string", + indexing=["index", "summary"], + index="enable-bm25", + ) + ] + ) + mail_schema = Schema( + name="mail", + mode="streaming", + document=document, + fieldsets=[FieldSet(name="default", fields=["title", "body"])], + rank_profiles=[ + RankProfile(name="default", first_phase="nativeRank(title, body)") + ]) + self.app_package = ApplicationPackage(name="mail", schema=[mail_schema]) + + self.vespa_docker = VespaDocker(port=8089) + self.app = self.vespa_docker.deploy(application_package=self.app_package) + + def test_streaming(self): + docs = [{ + "id": 1, + "groupname": "a@hotmail.com", + "fields": { + "title": "this is a title", + "body": "this is a body" + } + }, + { + "id": 1, + "groupname": "b@hotmail.com", + "fields": { + "title": "this is a title", + "body": "this is a body" + } + }, + { + "id": 2, + "groupname": "b@hotmail.com", + "fields": { + "title": "this is another title", + "body": "this is another body" + } + } + ] + self.app.wait_for_application_up(300) + + def callback(response:VespaResponse, id:str): + if not response.is_successfull(): + print("Id " + id + " + failed : " + response.json) + + self.app.feed_iterable(docs, schema="mail", namespace="test", callback=callback) + from vespa.io import VespaQueryResponse + response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'", groupname="a@hotmail.com") + self.assertTrue(response.is_successfull()) + self.assertEqual(response.number_documents_retrieved, 1) + + response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'", groupname="b@hotmail.com") + self.assertTrue(response.is_successfull()) + self.assertEqual(response.number_documents_retrieved, 2) + + with pytest.raises(Exception): + response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'") + + self.app.delete_data(schema="mail", namespace="test", data_id=2, groupname="b@hotmail.com") + + response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'title'", groupname="b@hotmail.com") + self.assertTrue(response.is_successfull()) + self.assertEqual(response.number_documents_retrieved, 1) + + self.app.update_data(schema="mail", namespace="test", data_id=1, groupname="b@hotmail.com", fields={"title": "this is a new foo"}) + response:VespaQueryResponse = self.app.query(yql="select * from sources * where title contains 'foo'", groupname="b@hotmail.com") + self.assertTrue(response.is_successfull()) + self.assertEqual(response.number_documents_retrieved, 1) + + response = self.app.get_data(schema="mail", namespace="test", data_id=1, groupname="b@hotmail.com") + self.assertDictEqual(response.json, + { + "pathId": "/document/v1/test/mail/group/b@hotmail.com/1", + "id": "id:test:mail:g=b@hotmail.com:1", + "fields": { + "body": "this is a body", + "title": "this is a new foo" + } + } + ) + + def tearDown(self) -> None: + self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT) + self.vespa_docker.container.remove() + diff --git a/vespa/test_package.py b/vespa/test_package.py index cdb158a3..2c8fe3c8 100644 --- a/vespa/test_package.py +++ b/vespa/test_package.py @@ -688,6 +688,7 @@ def test_services_to_text(self): ' \n' " \n" " \n" + " \n" " \n" ' \n' ' 1\n' @@ -733,9 +734,29 @@ def setUp(self) -> None: ) ]) ) + self.calendar = Schema( + name="calendar", + mode="streaming", + document=Document( + fields=[ + Field( + name="title", type="string", indexing=["attribute", "summary"] + ) + ]) + ) + self.event = Schema( + name="event", + mode="index", + document=Document( + fields=[ + Field( + name="title", type="string", indexing=["attribute", "summary"] + ) + ]) + ) self.app_package = ApplicationPackage( name="testapp", - schema=[self.mail]) + schema=[self.mail, self.calendar, self.event]) def test_generated_services_uses_mode_streaming(self): expected_result = ( @@ -744,11 +765,15 @@ def test_generated_services_uses_mode_streaming(self): ' \n' " \n" " \n" + " \n" " \n" ' \n' ' 1\n' " \n" ' \n' + ' \n' + ' \n' + ' \n' " \n" " \n" ' \n' @@ -873,6 +898,7 @@ def test_services_to_text(self): ' \n' " \n" " \n" + " \n" " \n" ' \n' ' 1\n' @@ -1021,6 +1047,7 @@ def test_services_to_text(self): ' \n' " \n" " \n" + " \n" " \n" ' \n' ' 1\n' @@ -1108,6 +1135,7 @@ def test_services_to_text(self): ' \n' " \n" " \n" + " \n" " \n" ' \n' ' 1\n' @@ -1150,6 +1178,7 @@ def test_services_to_text(self): ' \n' " \n" " \n" + " \n" ' \n' ' \n' ' \n' @@ -1200,6 +1229,7 @@ def test_services_to_text(self): ' \n' " \n" " \n" + " \n" ' \n' ' \n' ' \n'