Skip to content

Commit

Permalink
azurite support for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 5, 2024
1 parent a2277e0 commit e62759e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Optional, Union
from urllib.parse import urlparse

from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
Expand Down Expand Up @@ -59,10 +60,19 @@ def get_filesystem_client(self) -> FileSystemClient:
)

def get_blob_service_client(self):
return BlobServiceClient(
account_url=f"https://{self.account_name}.blob.core.windows.net",
credential=f"{self.get_credentials()}",
)
if self.base_path.startswith("http://"): # Azurite
connection_string = (
f"DefaultEndpointsProtocol=http;"
f"AccountName={self.account_name};"
f"AccountKey={self.account_key};"
f"BlobEndpoint=http://{urlparse(self.base_path).netloc}/{self.account_name}"
)
return BlobServiceClient.from_connection_string(connection_string)
else: # Real Azure Storage
return BlobServiceClient(
account_url=f"https://{self.account_name}.blob.core.windows.net",
credential=self.get_credentials(),
)

def get_data_lake_service_client(self) -> DataLakeServiceClient:
return DataLakeServiceClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ def __init__(self, config: DeltaLakeSourceConfig, ctx: PipelineContext):
self.source_config.azure is None
or self.source_config.azure.azure_config is None
):
raise ValueError("Azure Config must be provided for ABFSS base path")
self.azure_client = self.source_config.azure.azure_config.get_azure_client()
raise ValueError("Azure Config must be provided for Azure Blob Storage path")
self.azure_client = self.source_config.azure.azure_config.get_blob_service_client()

# self.profiling_times_taken = []
config_report = {
Expand Down Expand Up @@ -220,7 +220,10 @@ def ingest_table(
if self.source_config.is_s3:
browse_path = strip_s3_prefix(path)
elif self.source_config.is_azure:
browse_path = strip_abs_prefix(path)
if path.startswith('abfss://'):
browse_path = '/'.join(path.split('/', 3)[3:])
else:
browse_path = strip_abs_prefix(path)
else:
browse_path = path.strip("/")
else:
Expand Down Expand Up @@ -338,32 +341,40 @@ def get_storage_options(self) -> Dict[str, str]:
elif self.source_config.is_azure:
azure_config = self.source_config.azure.azure_config
creds = azure_config.get_credentials()
parsed_url = urlparse(self.source_config.base_path)

# Determine if we're using Azurite (http) or real Azure Storage (https)
is_azurite = parsed_url.scheme == "http"

opts = {
"AZURE_STORAGE_ACCOUNT_NAME": azure_config.account_name,
"AZURE_STORAGE_CONTAINER_NAME": azure_config.container_name,
"AZURE_TENANT_ID": "",
"AZURE_CLIENT_ID": "",
"AZURE_CLIENT_SECRET": "",
"AZURE_STORAGE_SAS_TOKEN": "",
"AZURE_STORAGE_ACCOUNT_KEY": "",
"account_name": azure_config.account_name,
"container_name": azure_config.container_name,
}

if isinstance(creds, ClientSecretCredential):
# Service Principal auth
opts.update(
{
"AZURE_TENANT_ID": azure_config.tenant_id or "",
"AZURE_CLIENT_ID": azure_config.client_id or "",
"AZURE_CLIENT_SECRET": azure_config.client_secret or "",
}
)
elif azure_config.sas_token and creds == azure_config.sas_token:
# SAS token auth
opts["AZURE_STORAGE_SAS_TOKEN"] = azure_config.sas_token
elif azure_config.account_key and creds == azure_config.account_key:
# Account key auth
opts["AZURE_STORAGE_ACCOUNT_KEY"] = azure_config.account_key
return opts
if is_azurite:
# For Azurite, we need to specify the endpoint URL and allow HTTP
opts.update({
"azure_storage_connection_string": (
f"DefaultEndpointsProtocol={parsed_url.scheme};"
f"AccountName={azure_config.account_name};"
f"AccountKey={azure_config.account_key};"
f"BlobEndpoint={parsed_url.scheme}://{parsed_url.netloc}/{azure_config.account_name}"
),
"allow_http_connection": "true",
})
else:
# For real Azure Storage, handle different auth methods
if isinstance(creds, ClientSecretCredential):
opts.update({
"tenant_id": azure_config.tenant_id,
"client_id": azure_config.client_id,
"client_secret": azure_config.client_secret,
})
else:
opts["credential"] = creds

# Remove empty values
return {k: v for k, v in opts.items() if v}
else:
return {}

Expand Down Expand Up @@ -394,21 +405,29 @@ def s3_get_folders(self, path: str) -> Iterable[str]:
yield f"{parse_result.scheme}://{parse_result.netloc}/{o.get('Prefix')}"

def azure_get_folders(self, path: str) -> Iterable[str]:
"""List folders from Azure Storage."""
parsed = urlparse(path)
prefix = parsed.path.lstrip("/")
container_client = self.azure_client.get_container_client(
parsed.netloc.split("@")[0]
)
"""List folders from Azure Blob Storage."""

try:
for item in container_client.walk_blobs(name_starts_with=prefix):
if isinstance(item, dict) and item.get("is_directory", False):
yield f"abfss://{parsed.netloc}/{item['name']}"
container_name = get_container_name(path)
prefix = get_key_prefix(path)
parsed_url = urlparse(path)

container_client = self.azure_client.get_container_client(container_name)

for blob in container_client.walk_blobs(name_starts_with=prefix, delimiter='/'):
if hasattr(blob, 'prefix'):
# Preserve the original URL scheme (http for Azurite, https for Azure)
folder_path = (
f"{parsed_url.scheme}://{parsed_url.netloc}/"
f"{container_name}/{blob.prefix}"
)
yield folder_path

except Exception as e:
self.report.report_failure(
"azure-folders", f"Failed to list ABFSS folders: {e}"
"azure-folders", f"Failed to list Azure Blob Storage folders: {e}"
)
logger.error(f"Error listing Azure folders: {str(e)}")

def local_get_folders(self, path: str) -> Iterable[str]:
if not os.path.isdir(path):
Expand Down

0 comments on commit e62759e

Please sign in to comment.