Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tags at table level #12

Merged
merged 5 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,4 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.envrc
245 changes: 215 additions & 30 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ readme = "README.md"

[tool.poetry.dependencies]
python = ">=3.10,<4.0"
snowflake-connector-python = "^3.7.1"
snowflake-connector-python = {extras = ["secure-local-storage"], version = "^3.12.1"}
pydantic-settings = "^2.2.1"
typer = "^0.12.0"

Expand Down
58 changes: 48 additions & 10 deletions snowflake_utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Column(BaseModel):

class TableStructure(BaseModel):
columns: dict = [str, Column]
tags: dict[str, str] = Field(default_factory=dict)

@property
def parsed_columns(self, replace_chars: bool = False) -> str:
Expand Down Expand Up @@ -259,6 +260,7 @@ def copy_into(
primary_keys=primary_keys,
replication_keys=replication_keys,
)
self.sync_tags(cursor)
else:
return self._copy(
copy_query,
Expand Down Expand Up @@ -429,20 +431,56 @@ def single_column_update(
f"UPDATE {self.fqn} SET {target_column.name} = {new_column.name};"
)

def current_tags(self) -> dict[str, dict[str, str]]:
tags = defaultdict(dict)
def _current_tags(self, level: str) -> list[tuple[str, str, str]]:
pquadri marked this conversation as resolved.
Show resolved Hide resolved
with connect() as connection:
cursor = connection.cursor()
cursor.execute(
f"""select lower(column_name) as column_name, lower(tag_name) as tag_name, tag_value
from table(information_schema.tag_references_all_columns('{self.fqn}', 'table'))"""
from table(information_schema.tag_references_all_columns('{self.fqn}', 'table'))
where lower(level) = '{level}'
"""
)
for column_name, tag_name, tag_value in cursor.fetchall():
tags[column_name][tag_name] = tag_value
return cursor.fetchall()

def current_column_tags(self) -> dict[str, dict[str, str]]:
tags = defaultdict(dict)

for column_name, tag_name, tag_value in self._current_tags("column"):
tags[column_name][tag_name] = tag_value
return tags

def current_table_tags(self) -> dict[str, str]:
return {
tag_name.casefold(): tag_value
for _, tag_name, tag_value in self._current_tags("table")
}

def sync_tags_table(self, cursor: SnowflakeCursor) -> None:
tags = self.current_table_tags()
desired_tags = {k.casefold(): v for k, v in self.table_structure.tags.items()}
for tag_name in desired_tags:
if tag_name not in tags:
self._set_table_tag(cursor, desired_tags, tag_name)
for tag_name in tags:
if tag_name not in desired_tags:
self._unset_table_tag(cursor, tag_name)

def _unset_table_tag(self, cursor, tag_name):
cursor.execute(
f"ALTER TABLE {self.fqn} UNSET TAG {governance_settings.fqn(tag_name)}"
)

def _set_table_tag(self, cursor, desired_tags, tag_name):
cursor.execute(
f"ALTER TABLE {self.fqn} SET TAG {governance_settings.fqn(tag_name)} = '{desired_tags[tag_name]}'"
)

def sync_tags(self, cursor: SnowflakeCursor) -> None:
tags = self.current_tags()
self.sync_tags_table(cursor)
self.sync_tags_columns(cursor)

def sync_tags_columns(self, cursor: SnowflakeCursor) -> None:
tags = self.current_column_tags()
existing_tags = {
f"{column}.{tag_name}.{tags[column][tag_name]}".casefold(): (
column,
Expand All @@ -464,20 +502,20 @@ def sync_tags(self, cursor: SnowflakeCursor) -> None:
for tag in existing_tags:
if tag not in desired_tags:
column, tag_name, _value = existing_tags[tag]
self._unset_tag(cursor, column, tag_name)
self._unset_column_tag(cursor, column, tag_name)

for tag in desired_tags:
if tag not in existing_tags:
self._set_tag(cursor, *desired_tags[tag])
self._set_column_tag(cursor, *desired_tags[tag])

def _set_tag(
def _set_column_tag(
self, cursor: SnowflakeCursor, column: str, tag_name: str, tag_value: str
) -> None:
cursor.execute(
f"""ALTER TABLE {self.fqn} MODIFY COLUMN "{column.upper()}" SET TAG {governance_settings.fqn(tag_name)} = '{tag_value}'"""
)

def _unset_tag(self, cursor: SnowflakeCursor, column: str, tag: str) -> None:
def _unset_column_tag(self, cursor: SnowflakeCursor, column: str, tag: str) -> None:
cursor.execute(
f'ALTER TABLE {self.fqn} MODIFY COLUMN "{column.upper()}" UNSET TAG {governance_settings.fqn(tag)}'
)
Expand Down
Loading
Loading