From 9534361e5894cc92b0023aa5c2f590d1071c1959 Mon Sep 17 00:00:00 2001 From: Milo Hyben Date: Mon, 29 Jan 2024 15:29:42 +1100 Subject: [PATCH 1/3] Simple script to loop through cpg-infra-private commits information, reconstruct budget changes history per project and sync with BQ billing budget table. --- scripts/billing_update_budget_history.py | 224 +++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 scripts/billing_update_budget_history.py diff --git a/scripts/billing_update_budget_history.py b/scripts/billing_update_budget_history.py new file mode 100644 index 000000000..4a20c53b0 --- /dev/null +++ b/scripts/billing_update_budget_history.py @@ -0,0 +1,224 @@ +""" +This script goes through all the commit history to cpg-infrastructure-private and +check for changes to budgets.yaml files for each of the projects. +Once collected it gets checked agains budget BQ table and +it inserts any missing records. +At this stage it does not delete any records from BQ table. +""" +import argparse +import logging +import os +import re +import sys +from datetime import datetime, timezone + +import google.cloud.bigquery as bq + +# name of the BQ table to insert the records +SM_GCP_BQ_BUDGET_VIEW = os.getenv('SM_GCP_BQ_BUDGET_VIEW') +SM_GCP_BQ_AGGREG_VIEW = os.getenv('SM_GCP_BQ_AGGREG_VIEW') + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +# print logs to terminal as well +logger.addHandler(logging.StreamHandler()) + + +def get_gcp_project_names(): + """ + Returns list of projects active SM_GCP_BQ_AGGREG_VIEW. + SM_GCP_BQ_AGGREG_VIEW is aggregated by day so it is not expensive to do + a full scan on project column only. + + Reason we need actual gcp_project names is the cpg_infrastructure_private can + contain only the suffix. + E.g. project name in cpg-infra-private is 'seqr', + but gcp_project name is 'seqr-123456' + """ + query = f""" + SELECT DISTINCT gcp_project FROM `{SM_GCP_BQ_AGGREG_VIEW}` + WHERE gcp_project IS NOT NULL + """ + logger.info(f'Executing {query}') + bq_client = bq.Client() + query_job = bq_client.query( + query, + ) + results = query_job.result() + # this would contain the mapping of project name to gcp_project name + # considering last part as number specific for gcp_project name + projects = {} + for row in results: + gcp_project_name = row['gcp_project'] + # by default gcp and project name are the same + project_name = gcp_project_name + # get the suffix + gcp_name_suffix = gcp_project_name.split('-')[-1] + # check if gcp_name_suffix is number + if gcp_name_suffix.isdigit(): + # if yes remove the number and leading '-' + project_name = gcp_project_name.replace(f'-{gcp_name_suffix}', '') + + projects[project_name] = gcp_project_name + + return projects + + +def extract_budget_updates(folder_path: str, project_name: str): + """ + Execute cmd on the path and return the output. + """ + cmd = f'cd {folder_path}; git log -L2,+1:"{project_name}/budgets.yaml" --pretty="format:%ci"' + logger.info(f'Executing {cmd}') + output = os.popen(cmd).read() + logger.info(f'Output: {output}') + if not output: + logger.warning(f'Failed to get git history for {project_name}') + return + + lines = output.split('\n') + # loop through lines in the reverse order + # and look for the first line that has a date or + # contains string monthly_budget + budget_records = {} + last_budget_value = None + for line in reversed(lines): + if '+ monthly_budget' in line: + logger.info(f'Found monthly_budget for {project_name}') + # line is in the format '+ monthly_budget: XYZ' + last_budget_value = line.split(':')[1].strip() + + elif re.match(r'\d{4}-\d{2}-\d{2}', line): + logger.info(f'Found date {line} for {project_name}') + # 2023-03-02 10:30:32 +1100 + dt = datetime.strptime(line.strip(), '%Y-%m-%d %H:%M:%S %z') + budget_records[dt.astimezone(timezone.utc)] = last_budget_value + + return budget_records + + +def get_bq_budgets(): + """ + Get all the budget records from BQ table. + """ + query = f'SELECT * FROM {SM_GCP_BQ_BUDGET_VIEW}' + logger.info(f'Executing {query}') + bq_client = bq.Client() + query_job = bq_client.query( + query, + ) + results = query_job.result() + bq_budgets = {} + for row in results: + project_name = row['gcp_project'] + date = row['created_at'] + budget = row['budget'] + bq_budgets.setdefault(project_name, {})[date] = budget + + return bq_budgets + + +def process(folder_path: str): + """ + Loop through all the folders in the path and check for changes to budgets.yaml files. + """ + logger.info(f'Processing {folder_path}') + gcp_project_names = get_gcp_project_names() + project_budgets = {} + for root, dirs, files in os.walk(folder_path): + if 'budgets.yaml' in files: + logger.info(f'Found budgets.yaml in {root}') + # extract the project name from the path, e.g. /Users/xyz/cpg-infrastructure-private/xyz + project_name = root.split('/')[-1] + + if project_name in gcp_project_names: + # mapp the project name to gcp_name + gcp_project_name = gcp_project_names[project_name] + else: + logger.warning( + f'Could not find gcp_project name for {project_name}, ' + 'looks like brand new project' + ) + gcp_project_name = project_name + + project_budgets[gcp_project_name] = extract_budget_updates( + folder_path, project_name + ) + + # we have budget history for all the projects + # now check against BQ table + logger.info(f'Checking against BQ table {SM_GCP_BQ_BUDGET_VIEW}') + bq_budgets = get_bq_budgets() + # now compare the two + missing_records = {} + for project_name, budget_records in project_budgets.items(): + bq_project_budgets = bq_budgets.get(project_name, {}) + + # now compare individual datetime records per project + for date, budget in budget_records.items(): + if date not in bq_project_budgets: + missing_records.setdefault(project_name, {})[date] = budget + + logger.info(f'missing_records {missing_records}') + + # now insert the missing records + bq_client = bq.Client() + + logger.info(f'Inserting {len(missing_records)} missing records') + for project_name, budget_records in missing_records.items(): + for date, budget in budget_records.items(): + query_params = [ + bq.ScalarQueryParameter('project_name', 'STRING', project_name), + bq.ScalarQueryParameter('created_at', 'TIMESTAMP', date), + bq.ScalarQueryParameter('budget', 'INT64', budget), + # we only use AUD in budget case + bq.ScalarQueryParameter('currency', 'STRING', 'AUD'), + ] + + query = f"""INSERT INTO {SM_GCP_BQ_BUDGET_VIEW} + (gcp_project, created_at, budget, currency) + VALUES (@project_name, @created_at, @budget, @currency) + """ + logger.info(f'Executing {query}') + query_job = bq_client.query( + query, job_config=bq.QueryJobConfig(query_parameters=query_params) + ) + query_job.result() + logger.info(f'Inserted {project_name}, {date}, {budget}') + + logger.info('Done') + + +def main(): + """ + Expect path to cpg-infrastructure-private folder as command line argument + """ + parser = argparse.ArgumentParser() + parser.add_argument( + '-p', + '--cpg_infra_path', + help="Path to cpg-infrastructure-private folder", + type=str, + ) + args = parser.parse_args() + cpg_infra_path: str = args.cpg_infra_path + + if not os.path.isdir(cpg_infra_path): + print(f'{cpg_infra_path} is not a directory') + sys.exit(1) + + # process budget history + process(cpg_infra_path) + + +if __name__ == '__main__': + # check env vars + if not SM_GCP_BQ_BUDGET_VIEW: + print('SM_GCP_BQ_BUDGET_VIEW is not set') + sys.exit(1) + if not SM_GCP_BQ_AGGREG_VIEW: + print('SM_GCP_BQ_AGGREG_VIEW is not set') + sys.exit(1) + + # execute main function + main() From 6edd3ffcd2ff5466946bec7532e068b94d93113f Mon Sep 17 00:00:00 2001 From: Milo Hyben Date: Mon, 29 Jan 2024 16:19:34 +1100 Subject: [PATCH 2/3] Linting. --- scripts/billing_update_budget_history.py | 26 +++++++++++++----------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/scripts/billing_update_budget_history.py b/scripts/billing_update_budget_history.py index 4a20c53b0..af744aa45 100644 --- a/scripts/billing_update_budget_history.py +++ b/scripts/billing_update_budget_history.py @@ -24,7 +24,7 @@ logger.addHandler(logging.StreamHandler()) -def get_gcp_project_names(): +def get_gcp_project_names() -> dict[str, str]: """ Returns list of projects active SM_GCP_BQ_AGGREG_VIEW. SM_GCP_BQ_AGGREG_VIEW is aggregated by day so it is not expensive to do @@ -36,7 +36,7 @@ def get_gcp_project_names(): but gcp_project name is 'seqr-123456' """ query = f""" - SELECT DISTINCT gcp_project FROM `{SM_GCP_BQ_AGGREG_VIEW}` + SELECT DISTINCT gcp_project FROM `{SM_GCP_BQ_AGGREG_VIEW}` WHERE gcp_project IS NOT NULL """ logger.info(f'Executing {query}') @@ -64,40 +64,42 @@ def get_gcp_project_names(): return projects -def extract_budget_updates(folder_path: str, project_name: str): +def extract_budget_updates(folder_path: str, project_name: str) -> dict[datetime, int]: """ Execute cmd on the path and return the output. """ + budget_records: dict[datetime, int] = {} + cmd = f'cd {folder_path}; git log -L2,+1:"{project_name}/budgets.yaml" --pretty="format:%ci"' logger.info(f'Executing {cmd}') output = os.popen(cmd).read() logger.info(f'Output: {output}') if not output: logger.warning(f'Failed to get git history for {project_name}') - return + return budget_records lines = output.split('\n') # loop through lines in the reverse order # and look for the first line that has a date or # contains string monthly_budget - budget_records = {} last_budget_value = None for line in reversed(lines): if '+ monthly_budget' in line: logger.info(f'Found monthly_budget for {project_name}') # line is in the format '+ monthly_budget: XYZ' - last_budget_value = line.split(':')[1].strip() + last_budget_value = int(line.split(':')[1].strip()) - elif re.match(r'\d{4}-\d{2}-\d{2}', line): + elif last_budget_value and re.match(r'\d{4}-\d{2}-\d{2}', line): logger.info(f'Found date {line} for {project_name}') # 2023-03-02 10:30:32 +1100 dt = datetime.strptime(line.strip(), '%Y-%m-%d %H:%M:%S %z') budget_records[dt.astimezone(timezone.utc)] = last_budget_value + last_budget_value = None return budget_records -def get_bq_budgets(): +def get_bq_budgets() -> dict[str, dict[datetime, int]]: """ Get all the budget records from BQ table. """ @@ -108,7 +110,7 @@ def get_bq_budgets(): query, ) results = query_job.result() - bq_budgets = {} + bq_budgets: dict[str, dict[datetime, int]] = {} for row in results: project_name = row['gcp_project'] date = row['created_at'] @@ -125,7 +127,7 @@ def process(folder_path: str): logger.info(f'Processing {folder_path}') gcp_project_names = get_gcp_project_names() project_budgets = {} - for root, dirs, files in os.walk(folder_path): + for root, _dirs, files in os.walk(folder_path): if 'budgets.yaml' in files: logger.info(f'Found budgets.yaml in {root}') # extract the project name from the path, e.g. /Users/xyz/cpg-infrastructure-private/xyz @@ -150,7 +152,7 @@ def process(folder_path: str): logger.info(f'Checking against BQ table {SM_GCP_BQ_BUDGET_VIEW}') bq_budgets = get_bq_budgets() # now compare the two - missing_records = {} + missing_records: dict[str, dict[datetime, int]] = {} for project_name, budget_records in project_budgets.items(): bq_project_budgets = bq_budgets.get(project_name, {}) @@ -197,7 +199,7 @@ def main(): parser.add_argument( '-p', '--cpg_infra_path', - help="Path to cpg-infrastructure-private folder", + help='Path to cpg-infrastructure-private folder', type=str, ) args = parser.parse_args() From 3320cd79b4c2bdd94e860d8eac02b62560a379af Mon Sep 17 00:00:00 2001 From: Milo Hyben Date: Mon, 5 Feb 2024 10:31:02 +1100 Subject: [PATCH 3/3] Keeping original os.cmd directory as per PR review suggestion. --- scripts/billing_update_budget_history.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/scripts/billing_update_budget_history.py b/scripts/billing_update_budget_history.py index af744aa45..51192bef4 100644 --- a/scripts/billing_update_budget_history.py +++ b/scripts/billing_update_budget_history.py @@ -70,9 +70,15 @@ def extract_budget_updates(folder_path: str, project_name: str) -> dict[datetime """ budget_records: dict[datetime, int] = {} - cmd = f'cd {folder_path}; git log -L2,+1:"{project_name}/budgets.yaml" --pretty="format:%ci"' + cmd = f'git log -L2,+1:"{project_name}/budgets.yaml" --pretty="format:%ci"' logger.info(f'Executing {cmd}') + # save the current directory + cwd = os.getcwd() + # change the directory to the folder_path and execute the command + os.chdir(folder_path) output = os.popen(cmd).read() + # change back to the original directory + os.chdir(cwd) logger.info(f'Output: {output}') if not output: logger.warning(f'Failed to get git history for {project_name}')