Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata rds #201

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions .env
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new env variables used to distinguish what db connection params are for.

Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
# helper to know if env is already loaded
BOOTSTRAPPED=1

# database
DB_HOST=local_rds
DB_PORT=5432
DB_NAME=performance_manager
DB_USER=postgres
DB_PASSWORD=postgres
ALEMBIC_DB_NAME=performance_manager_prod
# metadata database
MD_DB_HOST=local_md_rds
MD_DB_PORT=5433
MD_DB_NAME=metadata
MD_DB_USER=postgres
MD_DB_PASSWORD=postgres
ALEMBIC_MD_DB_NAME=metadata_prod

# performance manager database
RPM_DB_HOST=local_rpm_rds
RPM_DB_PORT=5434
RPM_DB_NAME=performance_manager
RPM_DB_USER=postgres
RPM_DB_PASSWORD=postgres
ALEMBIC_RPM_DB_NAME=performance_manager_prod

# s3 locations
SPRINGBOARD_BUCKET=mbta-ctd-dataplatform-dev-springboard
ARCHIVE_BUCKET=mbta-ctd-dataplatform-dev-archive
ERROR_BUCKET=mbta-ctd-dataplatform-dev-error
INCOMING_BUCKET=mbta-ctd-dataplatform-dev-incoming

# mbta-performance with personal access
PUBLIC_ARCHIVE_BUCKET=mbta-ctd-dataplatform-dev-archive

# Tableau
TABLEAU_USER=DOUPDATE
TABLEAU_PASSWORD=DOUPDATE
TABLEAU_SERVER=http://awtabDEV02.mbta.com
TABLEAU_SERVER=http://awtabDEV02.mbta.com
41 changes: 30 additions & 11 deletions .github/workflows/ci_python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,40 @@ jobs:
needs: setup
env:
BOOTSTRAPPED: 1
DB_HOST: localhost
DB_PORT: 5432
DB_NAME: pm_test
DB_USER: postgres
DB_PASSWORD: postgres
ALEMBIC_DB_NAME: performance_manager_staging
MD_DB_HOST: local_rds
MD_DB_PORT: 5433
MD_DB_NAME: metadata
MD_DB_USER: postgres
MD_DB_PASSWORD: postgres
ALEMBIC_MD_DB_NAME: metadata_prod
RPM_DB_HOST: local_rds
RPM_DB_PORT: 5434
RPM_DB_NAME: performance_manager
RPM_DB_USER: postgres
RPM_DB_PASSWORD: postgres
ALEMBIC_RPM_DB_NAME: performance_manager_prod
services:
postgres:
rpm_postgres:
image: postgres:14.4
ports:
- 5432:5432
- 5434:5432
env:
POSTGRES_PASSWORD: ${{env.DB_PASSWORD}}
POSTGRES_USER: ${{env.DB_USER}}
POSTGRES_DB: ${{env.DB_NAME}}
POSTGRES_PASSWORD: ${{env.RPM_DB_PASSWORD}}
POSTGRES_USER: ${{env.RPM_DB_USER}}
POSTGRES_DB: ${{env.RPM_DB_NAME}}
options:
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
md_postgres:
image: postgres:14.4
ports:
- 5433:5432
env:
POSTGRES_PASSWORD: ${{env.MD_DB_PASSWORD}}
POSTGRES_USER: ${{env.MD_DB_USER}}
POSTGRES_DB: ${{env.MD_DB_NAME}}
options:
--health-cmd pg_isready
--health-interval 10s
Expand Down
28 changes: 21 additions & 7 deletions docker-compose.yml
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two dbs in local docker compose.

Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,28 @@ version: '3'

services:

local_rds:
container_name: local_rds
rail_pm_rds:
container_name: ${RPM_DB_HOST}
image: postgres:14.4
env_file: .env
shm_size: '2gb'
environment:
POSTGRES_DB: ${DB_NAME}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: ${RPM_DB_NAME}
POSTGRES_PASSWORD: ${RPM_DB_PASSWORD}
ports:
- "5432:5432"
- "${RPM_DB_PORT}:5432"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These alternate ports cause issues with local development, additional logic is required in postgres_utils.py when setting port variables (running inside/outside of docker/aws)

command: ["postgres", "-c", "log_statement=all"]

metadata_rds:
container_name: ${MD_DB_HOST}
image: postgres:15
env_file: .env
shm_size: '2gb'
environment:
POSTGRES_DB: ${MD_DB_NAME}
POSTGRES_PASSWORD: ${MD_DB_PASSWORD}
ports:
- "${MD_DB_PORT}:5432"
command: ["postgres", "-c", "log_statement=all"]

performance_manager:
Expand All @@ -20,7 +32,8 @@ services:
build:
context: ./python_src
depends_on:
- local_rds
- rail_pm_rds
- metadata_rds
working_dir: /lamp
volumes:
- ~/.aws:/root/.aws:ro # map credentials to be used by boto3, read-only
Expand All @@ -32,7 +45,8 @@ services:
build:
context: ./python_src
depends_on:
- local_rds
- rail_pm_rds
- metadata_rds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seed_metadata container will need to depend on both db's since it invokes creation of both DatabaseManager objects

Suggested change
- metadata_rds
- metadata_rds
- rail_pm_rds

working_dir: /lamp
volumes:
# map credentials to be used by boto3, read-only
Expand Down
10 changes: 10 additions & 0 deletions python_src/alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/performance_manager_prod

[metadata_staging]
sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/metadata_staging

[metadata_prod]
sqlalchemy.url = driver://user:pass@localhost/dbname
script_location = src/lamp_py/migrations
version_locations = src/lamp_py/migrations/versions/metadata_prod

Comment on lines +86 to +90
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add metadata db migrations.

[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
Expand Down
3 changes: 1 addition & 2 deletions python_src/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ authors = [
ingestion = 'lamp_py.ingestion.pipeline:start'
performance_manager = 'lamp_py.performance_manager.pipeline:start'
seed_metadata = 'lamp_py.postgres.seed_metadata:run'
snapshot = 'lamp_py.postgres.snapshot:run'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use this so i removed it.

hyper_update = 'lamp_py.tableau.pipeline:start_hyper_updates'

[tool.poetry.dependencies]
Expand Down Expand Up @@ -80,6 +79,6 @@ max-line-length = 80
min-similarity-lines = 10
# ignore session maker as it gives pylint fits
# https://github.com/PyCQA/pylint/issues/7090
ignored-classes = ['sqlalchemy.orm.session.sessionmaker','pyarrow.compute']
ignored-classes = ['sqlalchemy.orm.session.sessionmaker', 'pyarrow.compute']
Comment on lines -83 to +82
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yaml linting

# ignore the migrations directory. its going to have duplication and _that is ok_.
ignore-paths = ["^src/lamp_py/migrations/.*$"]
7 changes: 6 additions & 1 deletion python_src/src/lamp_py/ingestion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from lamp_py.aws.ecs import handle_ecs_sigterm, check_for_sigterm
from lamp_py.aws.s3 import file_list_from_s3
from lamp_py.postgres.postgres_utils import start_rds_writer_process
from lamp_py.runtime_utils.alembic_migration import alembic_upgrade_to_head
from lamp_py.runtime_utils.env_validation import validate_environment
from lamp_py.runtime_utils.process_logger import ProcessLogger

Expand Down Expand Up @@ -78,10 +79,14 @@ def start() -> None:
"ERROR_BUCKET",
"INCOMING_BUCKET",
"SPRINGBOARD_BUCKET",
"ALEMBIC_MD_DB_NAME",
],
validate_db=True,
db_prefixes=["MD"],
)

# run metadata rds migrations
alembic_upgrade_to_head(db_name=os.environ["ALEMBIC_MD_DB_NAME"])

# run the main method
main()

Expand Down
15 changes: 11 additions & 4 deletions python_src/src/lamp_py/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from alembic import context

from lamp_py.postgres.postgres_utils import get_local_engine
from lamp_py.postgres.postgres_utils import DatabaseIndex

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
Expand All @@ -24,15 +24,22 @@
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from lamp_py.postgres.postgres_schema import SqlBase
from lamp_py.postgres.rail_performance_manager_schema import RpmSqlBase
from lamp_py.postgres.metadata_schema import MetadataSqlBase

# using dictionary for engine and target_metadata to support migrating multiple dbs
# each dictionary name should have a section defined in alembic.ini that
# matches the key used in the db_details dictionary
rpm_psql_args = DatabaseIndex.RAIL_PERFORMANCE_MANAGER.get_args_from_env()
md_psql_args = DatabaseIndex.METADATA.get_args_from_env()
db_details = {
"performance_manager": {
"engine": get_local_engine(),
"target_metadata": SqlBase.metadata,
"engine": rpm_psql_args.get_local_engine(),
"target_metadata": RpmSqlBase.metadata,
},
"metadata": {
"engine": md_psql_args.get_local_engine(),
"target_metadata": MetadataSqlBase.metadata,
},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""initial changes

Revision ID: 07903947aabe
Revises:
Create Date: 2023-12-11 15:12:47.261091

"""
from alembic import op
from sqlalchemy.exc import ProgrammingError
import logging
import sqlalchemy as sa

from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
from lamp_py.postgres.metadata_schema import MetadataLog

# revision identifiers, used by Alembic.
revision = "07903947aabe"
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"metadata_log",
sa.Column("pk_id", sa.Integer(), nullable=False),
sa.Column("rail_pm_processed", sa.Boolean(), nullable=True),
sa.Column("rail_pm_process_fail", sa.Boolean(), nullable=True),
sa.Column("path", sa.String(length=256), nullable=False),
sa.Column(
"created_on",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("pk_id"),
sa.UniqueConstraint("path"),
)
op.create_index(
"ix_metadata_log_not_processed",
"metadata_log",
["path"],
unique=False,
postgresql_where=sa.text("rail_pm_processed = false"),
)

# pull metadata from the rail performance manager database into the
# metadata database. the table may or may not exist, so wrap this in a try
# except
try:
rpm_db_manager = DatabaseManager(
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER
)

insert_data = []
# pull metadata from the rail performance manager database via direct
# sql query. the metadata_log table may or may not exist.
with rpm_db_manager.session.begin() as session:
result = session.execute(
"SELECT path, processed, process_fail FROM metadata_log"
)
for row in result:
(path, processed, process_fail) = row
insert_data.append(
{
"path": path,
"rail_pm_processed": processed,
"rail_pm_process_fail": process_fail,
}
)

except ProgrammingError as error:
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
# no metadata_log table in the rail performance manager database
#
# Raise all other sql errors
insert_data = []
if error.orig.pgcode == "42P01":
logging.info("No Metadata Table in Rail Performance Manager")
else:
raise

# insert data into the metadata database
if insert_data:
op.bulk_insert(MetadataLog.__table__, insert_data)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"ix_metadata_log_not_processed",
table_name="metadata_log",
)
op.drop_table("metadata_log")
# ### end Alembic commands ###
Loading
Loading