From 258b3860964bd01d05b48da7406977e4f3e04aa0 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 6 Nov 2024 00:01:38 +0000 Subject: [PATCH] test update --- .../ingestion/source/delta_lake/source.py | 48 ++++++++++--------- .../delta_lake/test_delta_lake_abs.py | 9 ++-- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py index 78982c63c6400..3339d0948b55b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py @@ -349,34 +349,38 @@ def get_storage_options(self) -> Dict[str, str]: elif self.source_config.is_azure: azure_config = self.source_config.azure.azure_config creds = azure_config.get_credentials() + parsed_url = urlparse(self.source_config.base_path) - opts = { - "fs.azure.account.name": azure_config.account_name, + opts: Dict[str, str] = { + "fs.azure.account.name": str(azure_config.account_name or ""), } - connection_string = ( - f"DefaultEndpointsProtocol={urlparse(self.source_config.base_path).scheme};" - f"AccountName={azure_config.account_name};" - ) + connection_string_parts = [ + f"DefaultEndpointsProtocol={parsed_url.scheme}", + f"AccountName={azure_config.account_name}", + ] if isinstance(creds, ClientSecretCredential): - opts.update( - { - "fs.azure.account.oauth2.client.id": azure_config.client_id, - "fs.azure.account.oauth2.client.secret": azure_config.client_secret, - "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{azure_config.tenant_id}/oauth2/token", - } - ) - elif azure_config.account_key: - connection_string += f"AccountKey={azure_config.account_key};" - elif azure_config.sas_token: - connection_string += ( - f"SharedAccessSignature={azure_config.sas_token.lstrip('?')};" - ) + if azure_config.client_id and azure_config.client_secret and azure_config.tenant_id: + opts.update( + { + "fs.azure.account.oauth2.client.id": str(azure_config.client_id), + "fs.azure.account.oauth2.client.secret": str(azure_config.client_secret), + "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{azure_config.tenant_id}/oauth2/token", + } + ) + else: + if azure_config.account_key: + connection_string_parts.append(f"AccountKey={azure_config.account_key}") + elif azure_config.sas_token: + connection_string_parts.append( + f"SharedAccessSignature={str(azure_config.sas_token or '').lstrip('?')}" + ) - if not isinstance(creds, ClientSecretCredential): - connection_string += f"BlobEndpoint={self.source_config.base_path.split('/' + azure_config.container_name)[0]}/" - opts["fs.azure.account.connection.string"] = connection_string + if not isinstance(creds, ClientSecretCredential): + endpoint = f"{self.source_config.base_path.split('/' + str(azure_config.container_name))[0]}/" + connection_string_parts.append(f"BlobEndpoint={endpoint}") + opts["fs.azure.account.connection.string"] = ";".join(connection_string_parts) return {k: v for k, v in opts.items() if v} else: diff --git a/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py b/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py index 177c31a43ff4b..f5a98bcaa6c16 100644 --- a/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py +++ b/metadata-ingestion/tests/integration/delta_lake/test_delta_lake_abs.py @@ -97,6 +97,9 @@ def populate_azure_storage(pytestconfig, azure_container): @freezegun.freeze_time("2023-01-01 00:00:00+00:00") def test_delta_lake_ingest_azure(pytestconfig, tmp_path, test_resources_dir): + base_path = f"http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1/test-container/delta_tables/sales" + + # Run the metadata ingestion pipeline. pipeline = Pipeline.create( { "run_id": "delta-lake-azure-test", @@ -104,7 +107,7 @@ def test_delta_lake_ingest_azure(pytestconfig, tmp_path, test_resources_dir): "type": "delta-lake", "config": { "env": "DEV", - "base_path": f"http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1/test-container/delta_tables/sales", + "base_path": base_path, "azure": { "azure_config": { "account_name": "devstoreaccount1", @@ -123,9 +126,7 @@ def test_delta_lake_ingest_azure(pytestconfig, tmp_path, test_resources_dir): } ) - logger.info( - f"Starting pipeline run with base_path: {pipeline.config['source']['config']['base_path']}" - ) + logger.info(f"Starting pipeline run with base_path: {base_path}") pipeline.run() pipeline.raise_from_status()