diff --git a/.env b/.env index 499516e2..82218519 100644 --- a/.env +++ b/.env @@ -2,7 +2,7 @@ BOOTSTRAPPED=1 # metadata database -MD_DB_HOST=local_rds +MD_DB_HOST=local_md_rds MD_DB_PORT=5433 MD_DB_NAME=metadata MD_DB_USER=postgres @@ -10,7 +10,7 @@ MD_DB_PASSWORD=postgres ALEMBIC_MD_DB_NAME=metadata_prod # performance manager database -RPM_DB_HOST=local_rds +RPM_DB_HOST=local_rpm_rds RPM_DB_PORT=5434 RPM_DB_NAME=performance_manager RPM_DB_USER=postgres diff --git a/docker-compose.yml b/docker-compose.yml index 186101b8..657eb375 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: rail_pm_rds: - container_name: rail_pm_rds + container_name: ${RPM_DB_HOST} image: postgres:14.4 env_file: .env shm_size: '2gb' @@ -15,7 +15,7 @@ services: command: ["postgres", "-c", "log_statement=all"] metadata_rds: - container_name: metadata_rds + container_name: ${MD_DB_HOST} image: postgres:15 env_file: .env shm_size: '2gb' @@ -45,6 +45,7 @@ services: build: context: ./python_src depends_on: + - rail_pm_rds - metadata_rds working_dir: /lamp volumes: diff --git a/python_src/alembic.ini b/python_src/alembic.ini index 7456322a..9b591aca 100644 --- a/python_src/alembic.ini +++ b/python_src/alembic.ini @@ -78,6 +78,11 @@ sqlalchemy.url = driver://user:pass@localhost/dbname script_location = src/lamp_py/migrations version_locations = src/lamp_py/migrations/versions/performance_manager_prod +[metadata_staging] +sqlalchemy.url = driver://user:pass@localhost/dbname +script_location = src/lamp_py/migrations +version_locations = src/lamp_py/migrations/versions/metadata_staging + [metadata_prod] sqlalchemy.url = driver://user:pass@localhost/dbname script_location = src/lamp_py/migrations diff --git a/python_src/src/lamp_py/ingestion/pipeline.py b/python_src/src/lamp_py/ingestion/pipeline.py index 96bc197b..1be3cbdd 100755 --- a/python_src/src/lamp_py/ingestion/pipeline.py +++ b/python_src/src/lamp_py/ingestion/pipeline.py @@ -79,12 +79,13 @@ def start() -> None: "ERROR_BUCKET", "INCOMING_BUCKET", "SPRINGBOARD_BUCKET", + "ALEMBIC_MD_DB_NAME", ], - validate_db=True, + db_prefixes=["MD"], ) # run metadata rds migrations - alembic_upgrade_to_head(db_name="metadata_prod") + alembic_upgrade_to_head(db_name=os.environ["ALEMBIC_MD_DB_NAME"]) # run the main method main() diff --git a/python_src/src/lamp_py/migrations/versions/metadata_staging/07903947aabe_initial_changes.py b/python_src/src/lamp_py/migrations/versions/metadata_staging/07903947aabe_initial_changes.py new file mode 100644 index 00000000..8b7659a9 --- /dev/null +++ b/python_src/src/lamp_py/migrations/versions/metadata_staging/07903947aabe_initial_changes.py @@ -0,0 +1,101 @@ +"""initial changes + +Revision ID: 07903947aabe +Revises: +Create Date: 2023-12-11 15:12:47.261091 + +""" +from alembic import op +from sqlalchemy.exc import ProgrammingError +import logging +import sqlalchemy as sa + +from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager +from lamp_py.postgres.metadata_schema import MetadataLog + +# revision identifiers, used by Alembic. +revision = "07903947aabe" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "metadata_log", + sa.Column("pk_id", sa.Integer(), nullable=False), + sa.Column("rail_pm_processed", sa.Boolean(), nullable=True), + sa.Column("rail_pm_process_fail", sa.Boolean(), nullable=True), + sa.Column("path", sa.String(length=256), nullable=False), + sa.Column( + "created_on", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.PrimaryKeyConstraint("pk_id"), + sa.UniqueConstraint("path"), + ) + op.create_index( + "ix_metadata_log_not_processed", + "metadata_log", + ["path"], + unique=False, + postgresql_where=sa.text("rail_pm_processed = false"), + ) + + # pull metadata from the rail performance manager database into the + # metadata database. the table may or may not exist, so wrap this in a try + # except + try: + rpm_db_manager = DatabaseManager( + db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER + ) + md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) + + insert_data = [] + # pull metadata from the rail performance manager database via direct + # sql query. the metadata_log table may or may not exist. + with rpm_db_manager.session.begin() as session: + result = session.execute( + "SELECT path, processed, process_fail FROM metadata_log" + ) + for row in result: + (path, processed, process_fail) = row + insert_data.append( + { + "path": path, + "rail_pm_processed": processed, + "rail_pm_process_fail": process_fail, + } + ) + + # insert data into the metadata database + with md_db_manager.session.begin() as session: + # do nothing on conflicts in file paths + result = session.execute( + sa.insert(MetadataLog.__table__).values(insert_data) + ) + + except ProgrammingError as error: + # Error 42P01 is an 'Undefined Table' error. This occurs when there is + # no metadata_log table in the rail performance manager database + # + # Raise all other sql errors + if error.orig.pgcode == "42P01": + logging.info("No Metadata Table in Rail Performance Manager") + else: + raise + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + "ix_metadata_log_not_processed", + table_name="metadata_log", + ) + op.drop_table("metadata_log") + # ### end Alembic commands ### diff --git a/python_src/src/lamp_py/migrations/versions/performance_manager_staging/005_96187da84955_remove_metadata.py b/python_src/src/lamp_py/migrations/versions/performance_manager_staging/005_96187da84955_remove_metadata.py index 7f94e246..7c065ace 100644 --- a/python_src/src/lamp_py/migrations/versions/performance_manager_staging/005_96187da84955_remove_metadata.py +++ b/python_src/src/lamp_py/migrations/versions/performance_manager_staging/005_96187da84955_remove_metadata.py @@ -4,10 +4,17 @@ Revises: 45dedc21086e Create Date: 2023-12-28 12:18:25.412282 +check that all information in the metadata table has been copied to the +metadata database before dropping the table and its indexes entirely. """ from alembic import op -import sqlalchemy as sa from sqlalchemy.dialects import postgresql +from sqlalchemy.exc import ProgrammingError +import logging +import sqlalchemy as sa + +from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager +from lamp_py.postgres.metadata_schema import MetadataLog # revision identifiers, used by Alembic. revision = "96187da84955" @@ -18,15 +25,38 @@ def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.create_table( - "static_service_id_lookup", - sa.Column("dummy_pk", sa.Integer(), nullable=False), - sa.Column("route_id", sa.String(length=60), nullable=True), - sa.Column("service_id", sa.String(length=60), nullable=True), - sa.Column("service_date", sa.Integer(), nullable=True), - sa.Column("static_version_key", sa.Integer(), nullable=True), - sa.PrimaryKeyConstraint("dummy_pk"), - ) + try: + rpm_db_manager = DatabaseManager( + db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER + ) + md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) + + with rpm_db_manager.session.begin() as session: + legacy_result = session.execute("SELECT path FROM metadata_log") + legacy_paths = set( + [record[0] for record in legacy_result.fetchall()] + ) + + except ProgrammingError as error: + # Error 42P01 is an 'Undefined Table' error. This occurs when there is + # no metadata_log table in the rail performance manager database + # + # Raise all other sql errors + if error.orig.pgcode == "42P01": + logging.info("No Metadata Table in Rail Performance Manager") + legacy_paths = set() + else: + raise + + modern_result = md_db_manager.select_as_list(sa.select(MetadataLog.path)) + modern_paths = set([record["path"] for record in modern_result]) + + missing_paths = legacy_paths - modern_paths + if len(missing_paths) != 0: + raise Exception( + f"Detected {len(missing_paths)} in Legacy Metadata Table not found in Metadata Database." + ) + op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log") op.drop_table("metadata_log") # ### end Alembic commands ### @@ -63,5 +93,4 @@ def downgrade() -> None: unique=False, postgresql_where="(processed = false)", ) - op.drop_table("static_service_id_lookup") # ### end Alembic commands ### diff --git a/python_src/src/lamp_py/performance_manager/pipeline.py b/python_src/src/lamp_py/performance_manager/pipeline.py index 5468ec50..0d9afbb6 100755 --- a/python_src/src/lamp_py/performance_manager/pipeline.py +++ b/python_src/src/lamp_py/performance_manager/pipeline.py @@ -56,7 +56,7 @@ def main(args: argparse.Namespace) -> None: db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER, verbose=args.verbose ) md_db_manager = DatabaseManager( - db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER, verbose=args.verbose + db_index=DatabaseIndex.METADATA, verbose=args.verbose ) # schedule object that will control the "event loop" @@ -101,14 +101,14 @@ def start() -> None: required_variables=[ "SPRINGBOARD_BUCKET", "SERVICE_NAME", - "ALEMBIC_DB_NAME", + "ALEMBIC_RPM_DB_NAME", ], optional_variables=["PUBLIC_ARCHIVE_BUCKET"], - validate_db=True, + db_prefixes=["RPM", "MD"], ) # run rail performance manager rds migrations - alembic_upgrade_to_head(db_name=os.getenv("ALEMBIC_DB_NAME")) + alembic_upgrade_to_head(db_name=os.getenv("ALEMBIC_RPM_DB_NAME")) # run main method with parsed args main(parsed_args) diff --git a/python_src/src/lamp_py/postgres/postgres_utils.py b/python_src/src/lamp_py/postgres/postgres_utils.py index 9eedb377..407e30a3 100644 --- a/python_src/src/lamp_py/postgres/postgres_utils.py +++ b/python_src/src/lamp_py/postgres/postgres_utils.py @@ -4,7 +4,6 @@ from enum import Enum, auto from queue import Queue from multiprocessing import Manager, Process -from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union, Callable import boto3 @@ -17,6 +16,7 @@ from lamp_py.aws.s3 import get_datetime_from_partition_path from lamp_py.runtime_utils.process_logger import ProcessLogger +# from .rail_performance_manager_schema import LegacyMetadataLog from .metadata_schema import MetadataLog @@ -39,33 +39,48 @@ def running_in_aws() -> bool: return bool(os.getenv("AWS_DEFAULT_REGION")) +def environ_get(var_name: str) -> str: + """ + get an environment variable, raising an error if it does not exist. this + utility helps with type checking. + """ + value = os.environ.get(var_name) + if value is None: + raise KeyError(f"Unable to find {var_name} in environment") + return value + + class PsqlArgs: """ container class for arguments needed to log into postgres db """ def __init__(self, prefix: str): - # when running application locally in CLI for configuration and - # debugging, db is accessed by localhost ip + self.host: str if not running_in_docker() and not running_in_aws(): + # running on the command line. use localhost ip self.host = "127.0.0.1" else: - host = os.environ.get(f"{prefix}_DB_HOST") - assert host is not None - self.host = host - - port = os.environ.get(f"{prefix}_DB_PORT") - assert port is not None - self.port: str = port - - name = os.environ.get(f"{prefix}_DB_NAME") - assert name is not None - self.name: str = name - - user = os.environ.get(f"{prefix}_DB_USER") - assert user is not None - self.user: str = user - + # running in docker, use the env variable pointing to the image + # name in the container. + # OR + # running on aws, use the env variable resolving to the aws rds + # instance + self.host = environ_get(f"{prefix}_DB_HOST") + + self.port: str + if running_in_docker(): + # running in docker, use the default port for postgres + self.port = "5432" + else: + # running on the command line, use the forwarded port out of the + # container + # OR + # running on aws, use the env var for the the aws rds instance + self.port = environ_get(f"{prefix}_DB_PORT") + + self.name: str = environ_get(f"{prefix}_DB_NAME") + self.user: str = environ_get(f"{prefix}_DB_USER") self.password: Optional[str] = os.environ.get(f"{prefix}_DB_PASSWORD") def get_password(self) -> str: @@ -180,14 +195,13 @@ class DatabaseIndex(Enum): METADATA = auto() RAIL_PERFORMANCE_MANAGER = auto() - @property - def env_prefix(self) -> str: + def get_env_prefix(self) -> str: """ in the environment, all keys for this database have this prefix """ - if self is DatabaseIndex.RAIL_PERFORMANCE_MANAGER: + if self == DatabaseIndex.RAIL_PERFORMANCE_MANAGER: return "RPM" - if self is DatabaseIndex.METADATA: + if self == DatabaseIndex.METADATA: return "MD" raise NotImplementedError("No environment prefix for index {self.name}") @@ -195,7 +209,8 @@ def get_args_from_env(self) -> PsqlArgs: """ generate a sql argument instance for this ind """ - return PsqlArgs(self.env_prefix) + prefix = self.get_env_prefix() + return PsqlArgs(prefix) def generate_update_db_password_func(psql_args: PsqlArgs) -> Callable: diff --git a/python_src/src/lamp_py/runtime_utils/env_validation.py b/python_src/src/lamp_py/runtime_utils/env_validation.py index f923f4f2..a19e9624 100644 --- a/python_src/src/lamp_py/runtime_utils/env_validation.py +++ b/python_src/src/lamp_py/runtime_utils/env_validation.py @@ -8,7 +8,7 @@ def validate_environment( required_variables: List[str], private_variables: Optional[List[str]] = None, optional_variables: Optional[List[str]] = None, - validate_db: bool = False, + db_prefixes: Optional[List[str]] = None, ) -> None: """ ensure that the environment has all the variables its required to have @@ -24,17 +24,18 @@ def validate_environment( required_variables.append("SERVICE_NAME") # add required database variables - if validate_db: - required_variables += [ - "DB_HOST", - "DB_NAME", - "DB_PORT", - "DB_USER", - ] - # if db password is missing, db region is required to generate a - # token to use as the password to the cloud database - if os.environ.get("DB_PASSWORD", None) is None: - required_variables.append("DB_REGION") + if db_prefixes is not None: + for prefix in db_prefixes: + required_variables += [ + f"{prefix}_DB_HOST", + f"{prefix}_DB_NAME", + f"{prefix}_DB_PORT", + f"{prefix}_DB_USER", + ] + # if db password is missing, db region is required to generate a + # token to use as the password to the cloud database + if os.environ.get(f"{prefix}_DB_PASSWORD", None) is None: + required_variables.append("DB_REGION") # check for missing variables. add found variables to our logs. missing_required = []