Skip to content

Commit

Permalink
More Notes from PR 201
Browse files Browse the repository at this point in the history
* Better Handling of hostnames and ports so that they work when the
  local performance manager is running in a docker image and directly on
  the command line.
* Create a metadata_staging migrations directory.
* In the performance manager staging migration, ensure that all metadata
  has been copied over to the metadata database before dropping the
  metadata table.
* Get alembic migration names from the environment.
* Improve environment validation to handle multiple databases.
  • Loading branch information
mzappitello committed Jan 9, 2024
1 parent dbe07bb commit 61323f8
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 57 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
BOOTSTRAPPED=1

# metadata database
MD_DB_HOST=local_rds
MD_DB_HOST=local_md_rds
MD_DB_PORT=5433
MD_DB_NAME=metadata
MD_DB_USER=postgres
MD_DB_PASSWORD=postgres
ALEMBIC_MD_DB_NAME=metadata_prod

# performance manager database
RPM_DB_HOST=local_rds
RPM_DB_HOST=local_rpm_rds
RPM_DB_PORT=5434
RPM_DB_NAME=performance_manager
RPM_DB_USER=postgres
Expand Down
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3'
services:

rail_pm_rds:
container_name: rail_pm_rds
container_name: ${RPM_DB_HOST}
image: postgres:14.4
env_file: .env
shm_size: '2gb'
Expand All @@ -15,7 +15,7 @@ services:
command: ["postgres", "-c", "log_statement=all"]

metadata_rds:
container_name: metadata_rds
container_name: ${MD_DB_HOST}
image: postgres:15
env_file: .env
shm_size: '2gb'
Expand Down Expand Up @@ -45,6 +45,7 @@ services:
build:
context: ./python_src
depends_on:
- rail_pm_rds
- metadata_rds
working_dir: /lamp
volumes:
Expand Down
5 changes: 5 additions & 0 deletions python_src/alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/performance_manager_prod

[metadata_staging]
sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/metadata_staging

[metadata_prod]
sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
Expand Down
5 changes: 3 additions & 2 deletions python_src/src/lamp_py/ingestion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ def start() -> None:
"ERROR_BUCKET",
"INCOMING_BUCKET",
"SPRINGBOARD_BUCKET",
"ALEMBIC_MD_DB_NAME",
],
validate_db=True,
db_prefixes=["MD"],
)

# run metadata rds migrations
alembic_upgrade_to_head(db_name="metadata_prod")
alembic_upgrade_to_head(db_name=os.environ["ALEMBIC_MD_DB_NAME"])

# run the main method
main()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""initial changes
Revision ID: 07903947aabe
Revises:
Create Date: 2023-12-11 15:12:47.261091
"""
from alembic import op
from sqlalchemy.exc import ProgrammingError
import logging
import sqlalchemy as sa

from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
from lamp_py.postgres.metadata_schema import MetadataLog

# revision identifiers, used by Alembic.
revision = "07903947aabe"
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"metadata_log",
sa.Column("pk_id", sa.Integer(), nullable=False),
sa.Column("rail_pm_processed", sa.Boolean(), nullable=True),
sa.Column("rail_pm_process_fail", sa.Boolean(), nullable=True),
sa.Column("path", sa.String(length=256), nullable=False),
sa.Column(
"created_on",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("pk_id"),
sa.UniqueConstraint("path"),
)
op.create_index(
"ix_metadata_log_not_processed",
"metadata_log",
["path"],
unique=False,
postgresql_where=sa.text("rail_pm_processed = false"),
)

# pull metadata from the rail performance manager database into the
# metadata database. the table may or may not exist, so wrap this in a try
# except
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA)

insert_data = []
# pull metadata from the rail performance manager database via direct
# sql query. the metadata_log table may or may not exist.
with rpm_db_manager.session.begin() as session:
result = session.execute(
"SELECT path, processed, process_fail FROM metadata_log"
)
for row in result:
(path, processed, process_fail) = row
insert_data.append(
{
"path": path,
"rail_pm_processed": processed,
"rail_pm_process_fail": process_fail,
}
)

# insert data into the metadata database
with md_db_manager.session.begin() as session:
# do nothing on conflicts in file paths
result = session.execute(
sa.insert(MetadataLog.__table__).values(insert_data)
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
if error.orig.pgcode == "42P01":
logging.info("No Metadata Table in Rail Performance Manager")
else:
raise

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_metadata_log_not_processed",
table_name="metadata_log",
)
op.drop_table("metadata_log")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
Revises: 45dedc21086e
Create Date: 2023-12-28 12:18:25.412282
check that all information in the metadata table has been copied to the
metadata database before dropping the table and its indexes entirely.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from sqlalchemy.exc import ProgrammingError
import logging
import sqlalchemy as sa

from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
from lamp_py.postgres.metadata_schema import MetadataLog

# revision identifiers, used by Alembic.
revision = "96187da84955"
Expand All @@ -18,15 +25,38 @@

def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"static_service_id_lookup",
sa.Column("dummy_pk", sa.Integer(), nullable=False),
sa.Column("route_id", sa.String(length=60), nullable=True),
sa.Column("service_id", sa.String(length=60), nullable=True),
sa.Column("service_date", sa.Integer(), nullable=True),
sa.Column("static_version_key", sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint("dummy_pk"),
)
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA)

with rpm_db_manager.session.begin() as session:
legacy_result = session.execute("SELECT path FROM metadata_log")
legacy_paths = set(
[record[0] for record in legacy_result.fetchall()]
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
if error.orig.pgcode == "42P01":
logging.info("No Metadata Table in Rail Performance Manager")
legacy_paths = set()
else:
raise

modern_result = md_db_manager.select_as_list(sa.select(MetadataLog.path))
modern_paths = set([record["path"] for record in modern_result])

missing_paths = legacy_paths - modern_paths
if len(missing_paths) != 0:
raise Exception(
f"Detected {len(missing_paths)} in Legacy Metadata Table not found in Metadata Database."
)

op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log")
op.drop_table("metadata_log")
# ### end Alembic commands ###
Expand Down Expand Up @@ -63,5 +93,4 @@ def downgrade() -> None:
unique=False,
postgresql_where="(processed = false)",
)
op.drop_table("static_service_id_lookup")
# ### end Alembic commands ###
8 changes: 4 additions & 4 deletions python_src/src/lamp_py/performance_manager/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main(args: argparse.Namespace) -> None:
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER, verbose=args.verbose
)
md_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER, verbose=args.verbose
db_index=DatabaseIndex.METADATA, verbose=args.verbose
)

# schedule object that will control the "event loop"
Expand Down Expand Up @@ -101,14 +101,14 @@ def start() -> None:
required_variables=[
"SPRINGBOARD_BUCKET",
"SERVICE_NAME",
"ALEMBIC_DB_NAME",
"ALEMBIC_RPM_DB_NAME",
],
optional_variables=["PUBLIC_ARCHIVE_BUCKET"],
validate_db=True,
db_prefixes=["RPM", "MD"],
)

# run rail performance manager rds migrations
alembic_upgrade_to_head(db_name=os.getenv("ALEMBIC_DB_NAME"))
alembic_upgrade_to_head(db_name=os.getenv("ALEMBIC_RPM_DB_NAME"))

# run main method with parsed args
main(parsed_args)
Expand Down
63 changes: 39 additions & 24 deletions python_src/src/lamp_py/postgres/postgres_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from enum import Enum, auto
from queue import Queue
from multiprocessing import Manager, Process
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, Callable

import boto3
Expand All @@ -17,6 +16,7 @@
from lamp_py.aws.s3 import get_datetime_from_partition_path
from lamp_py.runtime_utils.process_logger import ProcessLogger

# from .rail_performance_manager_schema import LegacyMetadataLog
from .metadata_schema import MetadataLog


Expand All @@ -39,33 +39,48 @@ def running_in_aws() -> bool:
return bool(os.getenv("AWS_DEFAULT_REGION"))


def environ_get(var_name: str) -> str:
"""
get an environment variable, raising an error if it does not exist. this
utility helps with type checking.
"""
value = os.environ.get(var_name)
if value is None:
raise KeyError(f"Unable to find {var_name} in environment")
return value


class PsqlArgs:
"""
container class for arguments needed to log into postgres db
"""

def __init__(self, prefix: str):
# when running application locally in CLI for configuration and
# debugging, db is accessed by localhost ip
self.host: str
if not running_in_docker() and not running_in_aws():
# running on the command line. use localhost ip
self.host = "127.0.0.1"
else:
host = os.environ.get(f"{prefix}_DB_HOST")
assert host is not None
self.host = host

port = os.environ.get(f"{prefix}_DB_PORT")
assert port is not None
self.port: str = port

name = os.environ.get(f"{prefix}_DB_NAME")
assert name is not None
self.name: str = name

user = os.environ.get(f"{prefix}_DB_USER")
assert user is not None
self.user: str = user

# running in docker, use the env variable pointing to the image
# name in the container.
# OR
# running on aws, use the env variable resolving to the aws rds
# instance
self.host = environ_get(f"{prefix}_DB_HOST")

self.port: str
if running_in_docker():
# running in docker, use the default port for postgres
self.port = "5432"
else:
# running on the command line, use the forwarded port out of the
# container
# OR
# running on aws, use the env var for the the aws rds instance
self.port = environ_get(f"{prefix}_DB_PORT")

self.name: str = environ_get(f"{prefix}_DB_NAME")
self.user: str = environ_get(f"{prefix}_DB_USER")
self.password: Optional[str] = os.environ.get(f"{prefix}_DB_PASSWORD")

def get_password(self) -> str:
Expand Down Expand Up @@ -180,22 +195,22 @@ class DatabaseIndex(Enum):
METADATA = auto()
RAIL_PERFORMANCE_MANAGER = auto()

@property
def env_prefix(self) -> str:
def get_env_prefix(self) -> str:
"""
in the environment, all keys for this database have this prefix
"""
if self is DatabaseIndex.RAIL_PERFORMANCE_MANAGER:
if self == DatabaseIndex.RAIL_PERFORMANCE_MANAGER:
return "RPM"
if self is DatabaseIndex.METADATA:
if self == DatabaseIndex.METADATA:
return "MD"
raise NotImplementedError("No environment prefix for index {self.name}")

def get_args_from_env(self) -> PsqlArgs:
"""
generate a sql argument instance for this ind
"""
return PsqlArgs(self.env_prefix)
prefix = self.get_env_prefix()
return PsqlArgs(prefix)


def generate_update_db_password_func(psql_args: PsqlArgs) -> Callable:
Expand Down
Loading

0 comments on commit 61323f8

Please sign in to comment.