Skip to content

Commit

Permalink
test update
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 6, 2024
1 parent 41e242f commit 258b386
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,34 +349,38 @@ def get_storage_options(self) -> Dict[str, str]:
elif self.source_config.is_azure:
azure_config = self.source_config.azure.azure_config
creds = azure_config.get_credentials()
parsed_url = urlparse(self.source_config.base_path)

opts = {
"fs.azure.account.name": azure_config.account_name,
opts: Dict[str, str] = {
"fs.azure.account.name": str(azure_config.account_name or ""),
}

connection_string = (
f"DefaultEndpointsProtocol={urlparse(self.source_config.base_path).scheme};"
f"AccountName={azure_config.account_name};"
)
connection_string_parts = [
f"DefaultEndpointsProtocol={parsed_url.scheme}",
f"AccountName={azure_config.account_name}",
]

if isinstance(creds, ClientSecretCredential):
opts.update(
{
"fs.azure.account.oauth2.client.id": azure_config.client_id,
"fs.azure.account.oauth2.client.secret": azure_config.client_secret,
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{azure_config.tenant_id}/oauth2/token",
}
)
elif azure_config.account_key:
connection_string += f"AccountKey={azure_config.account_key};"
elif azure_config.sas_token:
connection_string += (
f"SharedAccessSignature={azure_config.sas_token.lstrip('?')};"
)
if azure_config.client_id and azure_config.client_secret and azure_config.tenant_id:
opts.update(
{
"fs.azure.account.oauth2.client.id": str(azure_config.client_id),
"fs.azure.account.oauth2.client.secret": str(azure_config.client_secret),
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{azure_config.tenant_id}/oauth2/token",
}
)
else:
if azure_config.account_key:
connection_string_parts.append(f"AccountKey={azure_config.account_key}")
elif azure_config.sas_token:
connection_string_parts.append(
f"SharedAccessSignature={str(azure_config.sas_token or '').lstrip('?')}"
)

if not isinstance(creds, ClientSecretCredential):
connection_string += f"BlobEndpoint={self.source_config.base_path.split('/' + azure_config.container_name)[0]}/"
opts["fs.azure.account.connection.string"] = connection_string
if not isinstance(creds, ClientSecretCredential):
endpoint = f"{self.source_config.base_path.split('/' + str(azure_config.container_name))[0]}/"
connection_string_parts.append(f"BlobEndpoint={endpoint}")
opts["fs.azure.account.connection.string"] = ";".join(connection_string_parts)

return {k: v for k, v in opts.items() if v}
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,17 @@ def populate_azure_storage(pytestconfig, azure_container):

@freezegun.freeze_time("2023-01-01 00:00:00+00:00")
def test_delta_lake_ingest_azure(pytestconfig, tmp_path, test_resources_dir):
base_path = f"http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1/test-container/delta_tables/sales"

# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(
{
"run_id": "delta-lake-azure-test",
"source": {
"type": "delta-lake",
"config": {
"env": "DEV",
"base_path": f"http://localhost:{AZURITE_BLOB_PORT}/devstoreaccount1/test-container/delta_tables/sales",
"base_path": base_path,
"azure": {
"azure_config": {
"account_name": "devstoreaccount1",
Expand All @@ -123,9 +126,7 @@ def test_delta_lake_ingest_azure(pytestconfig, tmp_path, test_resources_dir):
}
)

logger.info(
f"Starting pipeline run with base_path: {pipeline.config['source']['config']['base_path']}"
)
logger.info(f"Starting pipeline run with base_path: {base_path}")
pipeline.run()
pipeline.raise_from_status()

Expand Down

0 comments on commit 258b386

Please sign in to comment.