Skip to content

Commit

Permalink
Create and Use Metadata Schema / RDS
Browse files Browse the repository at this point in the history
The metadata database will managed by the ingestion application and used
to store created springboard files and their process status in the bus
and rail performance manager applications.

* Create a schema for the database and add migration configuration paths
  to pull existing data out of the rail performance manager.
* Update the rail performance manager schema to drop the metadata table
  and add a migration that ensures all data has been moved over before
  the drop.
* Expand the `DatabaseManager` class to connect to either of the two dbs
  and improve the handling of host names and ports so that they work
  when the local performance manager is running in a docker image and
  directly on the command line.
* Update seed metadata script to the expected behavior.
* Update metadata postgres version in yaml files for the metadata rds to
  match the version deployed on aws.
* Improve environment validation to handle multiple databases.
* Update tests to use both database managers
  • Loading branch information
mzappitello committed Jan 17, 2024
1 parent f2e898c commit 1d2846d
Show file tree
Hide file tree
Showing 19 changed files with 632 additions and 272 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
BOOTSTRAPPED=1

# metadata database
MD_DB_HOST=local_rds
MD_DB_HOST=local_md_rds
MD_DB_PORT=5433
MD_DB_NAME=metadata
MD_DB_USER=postgres
MD_DB_PASSWORD=postgres
ALEMBIC_MD_DB_NAME=metadata_prod

# performance manager database
RPM_DB_HOST=local_rds
RPM_DB_HOST=local_rpm_rds
RPM_DB_PORT=5434
RPM_DB_NAME=performance_manager
RPM_DB_USER=postgres
Expand Down
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3'
services:

rail_pm_rds:
container_name: rail_pm_rds
container_name: ${RPM_DB_HOST}
image: postgres:14.4
env_file: .env
shm_size: '2gb'
Expand All @@ -15,8 +15,8 @@ services:
command: ["postgres", "-c", "log_statement=all"]

metadata_rds:
container_name: metadata_rds
image: postgres:14.4
container_name: ${MD_DB_HOST}
image: postgres:15
env_file: .env
shm_size: '2gb'
environment:
Expand Down Expand Up @@ -45,6 +45,7 @@ services:
build:
context: ./python_src
depends_on:
- rail_pm_rds
- metadata_rds
working_dir: /lamp
volumes:
Expand Down
10 changes: 10 additions & 0 deletions python_src/alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/performance_manager_prod

[metadata_staging]
sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/metadata_staging

[metadata_prod]
sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/metadata_prod

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
Expand Down
7 changes: 6 additions & 1 deletion python_src/src/lamp_py/ingestion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from lamp_py.aws.ecs import handle_ecs_sigterm, check_for_sigterm
from lamp_py.aws.s3 import file_list_from_s3
from lamp_py.postgres.postgres_utils import start_rds_writer_process
from lamp_py.runtime_utils.alembic_migration import alembic_upgrade_to_head
from lamp_py.runtime_utils.env_validation import validate_environment
from lamp_py.runtime_utils.process_logger import ProcessLogger

Expand Down Expand Up @@ -78,10 +79,14 @@ def start() -> None:
"ERROR_BUCKET",
"INCOMING_BUCKET",
"SPRINGBOARD_BUCKET",
"ALEMBIC_MD_DB_NAME",
],
validate_db=True,
db_prefixes=["MD"],
)

# run metadata rds migrations
alembic_upgrade_to_head(db_name=os.environ["ALEMBIC_MD_DB_NAME"])

# run the main method
main()

Expand Down
6 changes: 6 additions & 0 deletions python_src/src/lamp_py/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from lamp_py.postgres.rail_performance_manager_schema import RpmSqlBase
from lamp_py.postgres.metadata_schema import MetadataSqlBase

# using dictionary for engine and target_metadata to support migrating multiple dbs
# each dictionary name should have a section defined in alembic.ini that
# matches the key used in the db_details dictionary
rpm_psql_args = DatabaseIndex.RAIL_PERFORMANCE_MANAGER.get_args_from_env()
md_psql_args = DatabaseIndex.METADATA.get_args_from_env()
db_details = {
"performance_manager": {
"engine": rpm_psql_args.get_local_engine(),
"target_metadata": RpmSqlBase.metadata,
},
"metadata": {
"engine": md_psql_args.get_local_engine(),
"target_metadata": MetadataSqlBase.metadata,
},
}

# other values from the config, defined by the needs of env.py,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""initial changes
Revision ID: 07903947aabe
Revises:
Create Date: 2023-12-11 15:12:47.261091
"""
from alembic import op
from sqlalchemy.exc import ProgrammingError
import logging
import sqlalchemy as sa

from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
from lamp_py.postgres.metadata_schema import MetadataLog

# revision identifiers, used by Alembic.
revision = "07903947aabe"
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"metadata_log",
sa.Column("pk_id", sa.Integer(), nullable=False),
sa.Column("rail_pm_processed", sa.Boolean(), nullable=True),
sa.Column("rail_pm_process_fail", sa.Boolean(), nullable=True),
sa.Column("path", sa.String(length=256), nullable=False),
sa.Column(
"created_on",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("pk_id"),
sa.UniqueConstraint("path"),
)
op.create_index(
"ix_metadata_log_not_processed",
"metadata_log",
["path"],
unique=False,
postgresql_where=sa.text("rail_pm_processed = false"),
)

# pull metadata from the rail performance manager database into the
# metadata database. the table may or may not exist, so wrap this in a try
# except
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)

insert_data = []
# pull metadata from the rail performance manager database via direct
# sql query. the metadata_log table may or may not exist.
with rpm_db_manager.session.begin() as session:
result = session.execute(
"SELECT path, processed, process_fail FROM metadata_log"
)
for row in result:
(path, processed, process_fail) = row
insert_data.append(
{
"path": path,
"rail_pm_processed": processed,
"rail_pm_process_fail": process_fail,
}
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
insert_data = []
if error.orig.pgcode == "42P01":
logging.info("No Metadata Table in Rail Performance Manager")
else:
raise

# insert data into the metadata database
if insert_data:
op.bulk_insert(MetadataLog.__table__, insert_data)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_metadata_log_not_processed",
table_name="metadata_log",
)
op.drop_table("metadata_log")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""initial changes
Revision ID: 07903947aabe
Revises:
Create Date: 2023-12-11 15:12:47.261091
"""
from alembic import op
from sqlalchemy.exc import ProgrammingError
import logging
import sqlalchemy as sa

from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
from lamp_py.postgres.metadata_schema import MetadataLog

# revision identifiers, used by Alembic.
revision = "07903947aabe"
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"metadata_log",
sa.Column("pk_id", sa.Integer(), nullable=False),
sa.Column("rail_pm_processed", sa.Boolean(), nullable=True),
sa.Column("rail_pm_process_fail", sa.Boolean(), nullable=True),
sa.Column("path", sa.String(length=256), nullable=False),
sa.Column(
"created_on",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("pk_id"),
sa.UniqueConstraint("path"),
)
op.create_index(
"ix_metadata_log_not_processed",
"metadata_log",
["path"],
unique=False,
postgresql_where=sa.text("rail_pm_processed = false"),
)

# pull metadata from the rail performance manager database into the
# metadata database. the table may or may not exist, so wrap this in a try
# except
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)

insert_data = []
# pull metadata from the rail performance manager database via direct
# sql query. the metadata_log table may or may not exist.
with rpm_db_manager.session.begin() as session:
result = session.execute(
"SELECT path, processed, process_fail FROM metadata_log"
)
for row in result:
(path, processed, process_fail) = row
insert_data.append(
{
"path": path,
"rail_pm_processed": processed,
"rail_pm_process_fail": process_fail,
}
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
insert_data = []
if error.orig.pgcode == "42P01":
logging.info("No Metadata Table in Rail Performance Manager")
else:
raise

# insert data into the metadata database
if insert_data:
op.bulk_insert(MetadataLog.__table__, insert_data)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_metadata_log_not_processed",
table_name="metadata_log",
)
op.drop_table("metadata_log")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,6 @@

def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"metadata_log",
sa.Column("pk_id", sa.Integer(), nullable=False),
sa.Column("processed", sa.Boolean(), nullable=True),
sa.Column("process_fail", sa.Boolean(), nullable=True),
sa.Column("path", sa.String(length=256), nullable=False),
sa.Column(
"created_on",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("pk_id"),
sa.UniqueConstraint("path"),
)
op.create_index(
"ix_metadata_log_not_processed",
"metadata_log",
["path"],
unique=False,
postgresql_where=sa.text("processed = false"),
)

op.create_table(
"static_calendar",
sa.Column("pk_id", sa.Integer(), nullable=False),
Expand Down Expand Up @@ -574,7 +551,4 @@ def downgrade() -> None:
"ix_static_calendar_composite_1", table_name="static_calendar"
)
op.drop_table("static_calendar")

op.drop_index("ix_metadata_log_not_processed", table_name="metadata_log")
op.drop_table("metadata_log")
# ### end Alembic commands ###
Loading

0 comments on commit 1d2846d

Please sign in to comment.