-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metadata rds #201
Metadata rds #201
Conversation
c0f871e
to
fc6ad50
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new env variables used to distinguish what db connection params are for.
.github/workflows/ci_python.yaml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two dbs are needed by the performance manager tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two dbs in local docker compose.
[metadata_prod] | ||
sqlalchemy.url = driver://user:pass@localhost/dbname | ||
script_location = src/lamp_py/migrations | ||
version_locations = src/lamp_py/migrations/versions/metadata_prod | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add metadata db migrations.
@@ -12,7 +12,6 @@ authors = [ | |||
ingestion = 'lamp_py.ingestion.pipeline:start' | |||
performance_manager = 'lamp_py.performance_manager.pipeline:start' | |||
seed_metadata = 'lamp_py.postgres.seed_metadata:run' | |||
snapshot = 'lamp_py.postgres.snapshot:run' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't use this so i removed it.
ignored-classes = ['sqlalchemy.orm.session.sessionmaker','pyarrow.compute'] | ||
ignored-classes = ['sqlalchemy.orm.session.sessionmaker', 'pyarrow.compute'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yaml linting
alembic_upgrade_to_head(db_name="metadata_prod") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
migrations for the metadata db are owned by ingestion. this should make our deployment experience better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does complicate local development, since we don't have/don't really want to run a local ingestion image, standing up the local metadata rds would need a different configuration.
One option would be to allow the 'seed_metadata` image to do this, but it would require some re-working of that logic (probably some different env vars)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good note. lets discuss in person, but i think adding to seed_metadata
makes sense.
# run metadata rds migrations | ||
alembic_upgrade_to_head(db_name="metadata_prod") | ||
|
||
move_metadata() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
temp function to move all the metadata over from rail performance manager. we should only have to do this once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would want to do this in the migration file for the new metadata RDS, which essentially operates as a "temp" function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my instinct was that it would be wrong to have a connection to the RPM database in the migration file as that migration could run when there is no longer a metadata table in that database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial migration to establish the metadata table in the database.
@@ -425,7 +425,10 @@ def update_events_from_temp(db_manager: DatabaseManager) -> None: | |||
process_logger.log_complete() | |||
|
|||
|
|||
def process_gtfs_rt_files(db_manager: DatabaseManager) -> None: | |||
def process_gtfs_rt_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need both dbs now. one to figure out what files we have to extract and one to do the compute for the tranfromations.
def process_static_tables( | ||
rpm_db_manager: DatabaseManager, | ||
md_db_manager: DatabaseManager, | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need the md db manager to figure out what the new feed info files are and the rpm db manager to load the new data into.
def legacy_metadata_exists(rpm_db_manager: DatabaseManager) -> bool: | ||
""" | ||
are there any records in the legacy metadata table | ||
""" | ||
metadata = rpm_db_manager.select_as_dataframe( | ||
sa.select(LegacyMetadataLog.path) | ||
) | ||
|
||
return not metadata.empty | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check to see if there is legacy metadata that hasn't been copied over yet.
if legacy_metadata_exists(rpm_db_manager): | ||
raise EnvironmentError("Legacy Metadata Detected") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise an exception if there is legacy metadata. this will prevent us from running any of the performance manager logic, waiting for the next loop to give it a shot.
@@ -3,10 +3,10 @@ | |||
import sqlalchemy as sa | |||
from sqlalchemy.ext.declarative import declarative_base | |||
|
|||
SqlBase: Any = declarative_base() | |||
RpmSqlBase: Any = declarative_base() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new base, one for each database.
@@ -177,7 +177,7 @@ class TempEventCompare(SqlBase): # pylint: disable=too-few-public-methods | |||
) | |||
|
|||
|
|||
class MetadataLog(SqlBase): # pylint: disable=too-few-public-methods | |||
class LegacyMetadataLog(RpmSqlBase): # pylint: disable=too-few-public-methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this to avoid two tables with the same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we weren't using this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no functional changes. just creating the db managers and passing them in correctly and resetting the correct tables.
op.drop_index( | ||
"ix_metadata_log_not_processed", | ||
table_name="metadata_log", | ||
postgresql_where=sa.text("rail_pm_processed = false"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be required when calling to drop the index, just the index name and table name should be sufficient.
class MetadataLog(SqlBase): # pylint: disable=too-few-public-methods | ||
class LegacyMetadataLog(RpmSqlBase): # pylint: disable=too-few-public-methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just be dropping this table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that depends on the deployment order.
we'll discuss in a call.
def legacy_metadata_exists(rpm_db_manager: DatabaseManager) -> bool: | ||
""" | ||
are there any records in the legacy metadata table | ||
""" | ||
metadata = rpm_db_manager.select_as_dataframe( | ||
sa.select(LegacyMetadataLog.path) | ||
) | ||
|
||
return not metadata.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is really required. If everything is the performance manager pipeline is now configured to read from the new MetadataLog table, whether or not the LegacyMetadataLog exists shouldn't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was worried about some deployment error on our part and wanted to protect against that. i had planned on taking this out on a subsequent PR where everything is complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to discuss the migration process in a little more detail along with the plan for local development with the MetadataLog table residing in a different DB instance.
Before deployment, we should also change the configuration of the metadata-rds terraform module. The current metadata log table on the dev instance is 34 MB in size, so something like an allocated_storage
of 2 gibibytes and max_allocated_storage
of 5 should be sufficient. The m5.large
instance class is also probably overkill for a DB with one table, should drop that down to something like t3.small
docker-compose.yml
Outdated
|
||
metadata_rds: | ||
container_name: metadata_rds | ||
image: postgres:14.4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The terraform module has postgres 15 set for metadata-rds
need to sync this between the two.
70a7932
to
3d7718f
Compare
@@ -88,8 +85,6 @@ def start() -> None: | |||
# run metadata rds migrations | |||
alembic_upgrade_to_head(db_name="metadata_prod") | |||
|
|||
move_metadata() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do this in migration now.
@@ -40,6 +45,52 @@ def upgrade() -> None: | |||
unique=False, | |||
postgresql_where=sa.text("rail_pm_processed = false"), | |||
) | |||
|
|||
# pull metadata from the rail performance manager database into the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this works how we would like. it uses a test based query to trya and select metadata from the rail performance manager and eats an exception iff its caused by an "undefined table" error, i.e. the metadata log table does not exist.
@@ -0,0 +1,67 @@ | |||
"""remove_metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new migration to remove metadata table from performance manager db.
@@ -9,11 +9,8 @@ | |||
import signal | |||
from typing import List | |||
|
|||
import sqlalchemy as sa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset this file back to how it was.
METADATA = auto | ||
METADATA = auto() | ||
RAIL_PERFORMANCE_MANAGER = auto() | ||
|
||
def get_env_prefix(self) -> str: | ||
@property | ||
def env_prefix(self) -> str: | ||
""" | ||
in the environment, all keys for this database have this prefix | ||
""" | ||
if self == self.RAIL_PERFORMANCE_MANAGER: | ||
if self is DatabaseIndex.RAIL_PERFORMANCE_MANAGER: | ||
return "RPM" | ||
if self == self.METADATA: | ||
if self is DatabaseIndex.METADATA: | ||
return "MD" | ||
raise NotImplementedError("No environment prefix for index {self.name}") | ||
|
||
def get_args_from_env(self) -> PsqlArgs: | ||
""" | ||
generate a sql argument instance for this ind | ||
""" | ||
prefix = self.get_env_prefix() | ||
return PsqlArgs(prefix) | ||
return PsqlArgs(self.env_prefix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more correct way of doing this. (and fix the stupid auto typo i left)
def add_metadata_paths(self, paths: List[str]) -> None: | ||
""" | ||
add metadata filepaths to metadata table for testing | ||
""" | ||
print(paths) | ||
with self.session.begin() as session: | ||
session.execute( | ||
sa.insert(MetadataLog.__table__), | ||
[{"path": p} for p in paths], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this out of db manager class since it only makes sense for one of the databases.
@@ -177,27 +177,6 @@ class TempEventCompare(RpmSqlBase): # pylint: disable=too-few-public-methods | |||
) | |||
|
|||
|
|||
class LegacyMetadataLog(RpmSqlBase): # pylint: disable=too-few-public-methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋
3d7718f
to
47dad0e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most changes related to getting the local environment up and running correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need one of these for metadata_staging
also?
with rpm_db_manager.session.begin() as session: | ||
result = session.execute( | ||
"SELECT path, processed, process_fail FROM metadata_log" | ||
) | ||
for row in result: | ||
(path, processed, process_fail) = row | ||
insert_data.append( | ||
{ | ||
"path": path, | ||
"rail_pm_processed": processed, | ||
"rail_pm_process_fail": process_fail, | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: the select_as_list
method of DatabaseManager
does the same thing as this block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nevermind, I see you're renaming the dictionary values here.
psql_insert(MetadataLog.__table__) | ||
.values(insert_data) | ||
.on_conflict_do_nothing() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to do nothing on a conflict during this operation? It seems like a conflict would indicate some process failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm a little concerned in a case where this migration gets run out of sync with the migration on the production manager database and we find ourselves in a situation where we can't do anything to unblock prod since we can't manually adjust the databases. ingestion should only ever have flags marked as false, so i don't think we would be at risk of missing processing of incoming data.
happy to revert this though if you think its risky.
op.create_table( | ||
"static_service_id_lookup", | ||
sa.Column("dummy_pk", sa.Integer(), nullable=False), | ||
sa.Column("route_id", sa.String(length=60), nullable=True), | ||
sa.Column("service_id", sa.String(length=60), nullable=True), | ||
sa.Column("service_date", sa.Integer(), nullable=True), | ||
sa.Column("static_version_key", sa.Integer(), nullable=True), | ||
sa.PrimaryKeyConstraint("dummy_pk"), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any migration operations for the static_service_id_lookup
VIEW should be removed.
def upgrade() -> None: | ||
# ### commands auto generated by Alembic - please adjust! ### |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic you previously had, to check if records exist in the new metadata log table before starting the event loop, could be added into these migration files for performance manager.
But that shouldn't really be required, since the PM event loop will just query an empty table, until it's not empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we need to check if records exist in the new metadata log table before executing the drop table command, otherwise the table could be dropped before the records are transferred to the new metadata_log table.
@@ -32,7 +45,7 @@ services: | |||
build: | |||
context: ./python_src | |||
depends_on: | |||
- local_rds | |||
- metadata_rds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seed_metadata
container will need to depend on both db's since it invokes creation of both DatabaseManager
objects
- metadata_rds | |
- metadata_rds | |
- rail_pm_rds |
ports: | ||
- "5432:5432" | ||
- "${RPM_DB_PORT}:5432" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These alternate ports cause issues with local development, additional logic is required in postgres_utils.py
when setting port
variables (running inside/outside of docker/aws)
# run metadata rds migrations | ||
alembic_upgrade_to_head(db_name="metadata_prod") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db_name
should be pulled from env var here, and then the env var added to the validate_environment
function
@@ -102,7 +107,7 @@ def start() -> None: | |||
validate_db=True, | |||
) | |||
|
|||
# run rds migrations | |||
# run rail performance manager rds migrations | |||
alembic_upgrade_to_head(db_name=os.getenv("ALEMBIC_DB_NAME")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update to ALEMBIC_RPM_DB_NAME
along with same input to validate_environment
validate_environment
function also requires updates because of assumed validate_db
environmental variables.
md_db_manager = DatabaseManager( | ||
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER, verbose=args.verbose | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
md_db_manager = DatabaseManager( | |
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER, verbose=args.verbose | |
) | |
md_db_manager = DatabaseManager( | |
db_index=DatabaseIndex.METADATA, verbose=args.verbose | |
) |
We're going to have two RDS schemas to maintain, one for the rail performance manager and one for the metadata. Move the rail performance manager into its own file thats better named. Remove the snapshot postgres feature since we don't use it.
* create the schema with a different sql base class. * create the migrations and update alembic env and ini. * use if for inserting metadata in the ingestion app. * use it for getting unprocessed files in the rail performance manager application. * update tests to use both database managers
We need existing metadata to be copied from the rail performance manager rds to the metadata rds. Do so in the ingestion application before starting the main method. In the performance manager, don't enter the biz logic of an event loop until the legacy metadata table has been copied over.
Squash this with the docker-compose commit before merging. yamllint docker-compose as well.
* add proper migrations to seed metadata script * fix mypy error in DataBaseIndex enum * update metadata postgres version in yaml files * insert metadata from rail performance manager rds in migrations script. * remove legacy metadata table from rail performance manager schema
* Better Handling of hostnames and ports so that they work when the local performance manager is running in a docker image and directly on the command line. * Create a metadata_staging migrations directory. * In the performance manager staging migration, ensure that all metadata has been copied over to the metadata database before dropping the metadata table. * Get alembic migration names from the environment. * Improve environment validation to handle multiple databases.
47dad0e
to
61323f8
Compare
@@ -8,7 +8,7 @@ def validate_environment( | |||
required_variables: List[str], | |||
private_variables: Optional[List[str]] = None, | |||
optional_variables: Optional[List[str]] = None, | |||
validate_db: bool = False, | |||
db_prefixes: Optional[List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick:
db_prefixes: Optional[List[str]] = None, | |
db_prefixes: Iterable[str] = (), |
I know this is following the convention of private_variables
and optional_variables
because pylint complains of a default parameter values of []
but Iterable[str] = ()
could be used for all of those parameters and we could drop the if
checks before our for
loops and setting private_variables=[]
at the start of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefix file names with 001_
to help with sorting future migrations, same for metadata_staging
migration file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
The last significant issue is the insert operation on the new metdata table. Everything else appears to be running correctly for me, locally.
@@ -22,6 +22,7 @@ def get_alembic_config(db_name: str) -> Config: | |||
"performance_manager_dev", | |||
"performance_manager_staging", | |||
"performance_manager_prod", | |||
"metadata_prod", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add metadata_staging
try: | ||
rpm_db_manager = DatabaseManager( | ||
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER | ||
) | ||
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) | ||
|
||
insert_data = [] | ||
# pull metadata from the rail performance manager database via direct | ||
# sql query. the metadata_log table may or may not exist. | ||
with rpm_db_manager.session.begin() as session: | ||
result = session.execute( | ||
"SELECT path, processed, process_fail FROM metadata_log" | ||
) | ||
for row in result: | ||
(path, processed, process_fail) = row | ||
insert_data.append( | ||
{ | ||
"path": path, | ||
"rail_pm_processed": processed, | ||
"rail_pm_process_fail": process_fail, | ||
} | ||
) | ||
|
||
# insert data into the metadata database | ||
with md_db_manager.session.begin() as session: | ||
# do nothing on conflicts in file paths | ||
result = session.execute( | ||
sa.insert(MetadataLog.__table__).values(insert_data) | ||
) | ||
|
||
except ProgrammingError as error: | ||
# Error 42P01 is an 'Undefined Table' error. This occurs when there is | ||
# no metadata_log table in the rail performance manager database | ||
# | ||
# Raise all other sql errors | ||
if error.orig.pgcode == "42P01": | ||
logging.info("No Metadata Table in Rail Performance Manager") | ||
else: | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try: | |
rpm_db_manager = DatabaseManager( | |
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER | |
) | |
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) | |
insert_data = [] | |
# pull metadata from the rail performance manager database via direct | |
# sql query. the metadata_log table may or may not exist. | |
with rpm_db_manager.session.begin() as session: | |
result = session.execute( | |
"SELECT path, processed, process_fail FROM metadata_log" | |
) | |
for row in result: | |
(path, processed, process_fail) = row | |
insert_data.append( | |
{ | |
"path": path, | |
"rail_pm_processed": processed, | |
"rail_pm_process_fail": process_fail, | |
} | |
) | |
# insert data into the metadata database | |
with md_db_manager.session.begin() as session: | |
# do nothing on conflicts in file paths | |
result = session.execute( | |
sa.insert(MetadataLog.__table__).values(insert_data) | |
) | |
except ProgrammingError as error: | |
# Error 42P01 is an 'Undefined Table' error. This occurs when there is | |
# no metadata_log table in the rail performance manager database | |
# | |
# Raise all other sql errors | |
if error.orig.pgcode == "42P01": | |
logging.info("No Metadata Table in Rail Performance Manager") | |
else: | |
raise | |
try: | |
rpm_db_manager = DatabaseManager( | |
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER | |
) | |
insert_data = [] | |
# pull metadata from the rail performance manager database via direct | |
# sql query. the metadata_log table may or may not exist. | |
with rpm_db_manager.session.begin() as session: | |
result = session.execute( | |
"SELECT path, processed, process_fail FROM metadata_log" | |
) | |
for row in result: | |
(path, processed, process_fail) = row | |
insert_data.append( | |
{ | |
"path": path, | |
"rail_pm_processed": processed, | |
"rail_pm_process_fail": process_fail, | |
} | |
) | |
except ProgrammingError as error: | |
# Error 42P01 is an 'Undefined Table' error. This occurs when there is | |
# no metadata_log table in the rail performance manager database | |
# | |
# Raise all other sql errors | |
insert_data = [] | |
if error.orig.pgcode == "42P01": | |
logging.info("No Metadata Table in Rail Performance Manager") | |
else: | |
raise | |
if insert_data: | |
op.bulk_insert(MetadataLog.__table__, insert_data) | |
This insert operation needs to use op.bulk_insert
or something similar to insert anything into the Metadata table that was created during this migration operation. An outside session will not be able to see the Metadata table until the upgrade has completed and all operations have been committed.
missing_paths = legacy_paths - modern_paths | ||
if len(missing_paths) != 0: | ||
raise Exception( | ||
f"Detected {len(missing_paths)} in Legacy Metadata Table not found in Metadata Database." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking:
This failure mechanism will result in the ECS dying an continually restarting if performance manager is trying to run before the ingestion migration completes. An alternate would be this check running in a while
loop and breaking once there are no outstanding legacy_paths
, compared to modern_paths
.
* rename migration files to use standard format * enter while loop to detect legacy metadata thats not in the metadatabase. this will keep the service running until its set rather than stopping the service and restarting it. * Use alembic.op to bulk insert metadata in the Metadata migration rather than sql alchemy insert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more change..
|
||
# insert data into the metadata database | ||
if insert_data: | ||
op.bulk_insert(MetadataLog.__table, insert_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
op.bulk_insert(MetadataLog.__table, insert_data) | |
op.bulk_insert(MetadataLog.__table__, insert_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm mad that mypy
didn't catch this one.
maybe thats why we need sqlalchemy 2.0
|
||
# insert data into the metadata database | ||
if insert_data: | ||
op.bulk_insert(MetadataLog.__table, insert_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
op.bulk_insert(MetadataLog.__table, insert_data) | |
op.bulk_insert(MetadataLog.__table__, insert_data) |
rpm_db_manager = DatabaseManager( | ||
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER | ||
) | ||
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking:
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with this, but pylint.
rpm_db_manager = DatabaseManager( | ||
db_index=DatabaseIndex.RAIL_PERFORMANCE_MANAGER | ||
) | ||
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking:
md_db_manager = DatabaseManager(db_index=DatabaseIndex.METADATA) |
f76f9bd
to
a9e0338
Compare
a9e0338
to
6f2f519
Compare
replaced by #224 |
Create a Metadata RDS for the project that can be used by multiple applications to query the status of files stored on S3
Asana Task: https://app.asana.com/0/1189492770004753/1205798784639666/f