Skip to content

Commit

Permalink
Update source.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 5, 2024
1 parent 0808bdf commit 0309764
Showing 1 changed file with 22 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,15 @@ def ingest_table(
if self.source_config.is_s3:
browse_path = strip_s3_prefix(path)
elif self.source_config.is_azure:
if path.startswith("abfss://"):
browse_path = "/".join(path.split("/", 3)[3:])
elif path.startswith(("http://", "https://")):
parsed_url = urlparse(path)
if ".blob.core.windows.net" in parsed_url.netloc:
# Regular Azure Storage
browse_path = strip_abs_prefix(path)
else:
# Azurite or other local endpoint
# Skip the account name and container parts of the path
path_parts = parsed_url.path.split("/")
if len(path_parts) > 2:
browse_path = "/".join(path_parts[2:])
else:
browse_path = ""
# Get just container/path format instead of full HTTPS URL
container_name = get_container_name(path)
key_prefix = get_key_prefix(path)
if path.startswith('http'):
browse_path = f"{container_name}/{key_prefix}" if key_prefix else container_name
else:
browse_path = path.strip("/")
browse_path = strip_abs_prefix(path)
else:
browse_path = path.strip("/")
else:
browse_path = path.split(self.source_config.base_path)[1].strip("/")

Expand Down Expand Up @@ -358,41 +350,30 @@ def get_storage_options(self) -> Dict[str, str]:
creds = azure_config.get_credentials()
parsed_url = urlparse(self.source_config.base_path)

# Determine if we're using Azurite (http) or real Azure Storage (https)
is_azurite = parsed_url.scheme == "http"

opts = {
"account_name": azure_config.account_name,
"container_name": azure_config.container_name,
}

if is_azurite:
# For Azurite, we need to specify the endpoint URL and allow HTTP
if isinstance(creds, ClientSecretCredential):
# Service principal auth
opts.update(
{
"azure_storage_connection_string": (
f"DefaultEndpointsProtocol={parsed_url.scheme};"
f"AccountName={azure_config.account_name};"
f"AccountKey={azure_config.account_key};"
f"BlobEndpoint={parsed_url.scheme}://{parsed_url.netloc}/{azure_config.account_name}"
),
"allow_http_connection": "true",
"tenant_id": azure_config.tenant_id,
"client_id": azure_config.client_id,
"client_secret": azure_config.client_secret,
}
)
else:
# For real Azure Storage, handle different auth methods
if isinstance(creds, ClientSecretCredential):
opts.update(
{
"tenant_id": azure_config.tenant_id,
"client_id": azure_config.client_id,
"client_secret": azure_config.client_secret,
}
)
else:
opts["credential"] = creds
elif azure_config.account_key or azure_config.sas_token:
# Account key or SAS token auth - use connection string
opts["connection_string"] = (
f"DefaultEndpointsProtocol={parsed_url.scheme};"
f"AccountName={azure_config.account_name};"
f"AccountKey={azure_config.account_key or ''};"
f"SharedAccessSignature={azure_config.sas_token or ''};"
f"BlobEndpoint={parsed_url.scheme}://{parsed_url.netloc}/{azure_config.account_name}"
)

# Remove empty values
return {k: v for k, v in opts.items() if v}
else:
return {}
Expand Down Expand Up @@ -437,7 +418,6 @@ def azure_get_folders(self, path: str) -> Iterable[str]:
name_starts_with=prefix, delimiter="/"
):
if hasattr(blob, "prefix"):
# Preserve the original URL scheme (http for Azurite, https for Azure)
folder_path = (
f"{parsed_url.scheme}://{parsed_url.netloc}/"
f"{container_name}/{blob.prefix}"
Expand Down

0 comments on commit 0309764

Please sign in to comment.