Skip to content

Commit

Permalink
Merge pull request #1416 from microbiomedata/issue-1415-workflow-type…
Browse files Browse the repository at this point in the history
…-migration

Migrate `workflow_type` values in `data_object` table
  • Loading branch information
pkalita-lbl authored Oct 11, 2024
2 parents fd8ff76 + 5b1a995 commit e81b583
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 9 deletions.
1 change: 0 additions & 1 deletion nmdc_server/alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ keys = console
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

Expand Down
7 changes: 4 additions & 3 deletions nmdc_server/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from nmdc_server.config import Settings
from nmdc_server.database import SessionLocalIngest
from nmdc_server.ingest import errors
from nmdc_server.logger import get_logger


@click.group()
Expand Down Expand Up @@ -75,9 +74,7 @@ def ingest(verbose, function_limit, skip_annotation, swap_rancher_secrets):
level = logging.INFO
elif verbose > 1:
level = logging.DEBUG
logger = get_logger(__name__)
logging.basicConfig(level=level, format="%(message)s")
logger.setLevel(logging.INFO)

jobs.do_ingest(function_limit, skip_annotation)

Expand Down Expand Up @@ -269,3 +266,7 @@ def load_db(key_file, user, host, list_backups, backup_file):
sys.exit(1)

click.secho(f"\nSuccessfully loaded {settings.current_db_uri}", fg="green")


if __name__ == "__main__":
cli()
2 changes: 0 additions & 2 deletions nmdc_server/ingest/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,3 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading search indices")
search_index.load(db)
db.commit()

logger.info("Ingest finished successfully")
8 changes: 5 additions & 3 deletions nmdc_server/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def ping():

def update_nmdc_functions():
"""Update NMDC custom functions for both databases."""
logger = get_logger(__name__)
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.setLevel(logging.INFO)
for db_info in [(database.SessionLocal, "active"), (database.SessionLocalIngest, "ingest")]:
db_to_update, db_type = db_info
with db_to_update() as db:
Expand Down Expand Up @@ -84,10 +81,15 @@ def do_ingest(function_limit, skip_annotation):
load(ingest_db, function_limit=function_limit, skip_annotation=skip_annotation)

# copy persistent data from the production db to the ingest db
logger.info("Merging file_download")
maybe_merge_download_artifact(ingest_db, prod_db.query(models.FileDownload))
logger.info("Merging bulk_download")
maybe_merge_download_artifact(ingest_db, prod_db.query(models.BulkDownload))
logger.info("Merging bulk_download_data_object")
maybe_merge_download_artifact(ingest_db, prod_db.query(models.BulkDownloadDataObject))

logger.info("Ingest finished successfully")


@celery_app.task
def ingest(function_limit=None, skip_annotation=False):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Rename data object workflow types
The Berkeley schema migration changed a number of workflow type names. Even though
the `data_object` table is truncated at the start of the ingest process, the old
workflow types were being introduced by a merge operation from the live database
to the ingest database.
See also: https://github.com/microbiomedata/nmdc-server/issues/1415
Revision ID: e54d37bfb90b
Revises: 2ec2d0b4f840
Create Date: 2024-10-11 18:06:08.521445
"""

from typing import Optional

import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql import column, table

# revision identifiers, used by Alembic.
revision: str = "e54d37bfb90b"
down_revision: Optional[str] = "2ec2d0b4f840"
branch_labels: Optional[str] = None
depends_on: Optional[str] = None


WORKFLOW_TYPE_MAP = [
{
"old": "nmdc:MAGsAnalysisActivity",
"new": "nmdc:MagsAnalysis",
},
{
"old": "nmdc:MetabolomicsAnalysisActivity",
"new": "nmdc:MetabolomicsAnalysis",
},
{
"old": "nmdc:MetaProteomicAnalysis",
"new": "nmdc:MetaproteomicAnalysis",
},
{
"old": "nmdc:metaT",
"new": "nmdc:MetatranscriptomeAnalysis",
},
{
"old": "nmdc:NomAnalysisActivity",
"new": "nmdc:NomAnalysis",
},
{
"old": "nmdc:ReadbasedAnalysis",
"new": "nmdc:ReadBasedTaxonomyAnalysis",
},
{
"old": "nmdc:ReadQCAnalysisActivity",
"new": "nmdc:ReadQcAnalysis",
},
]


def upgrade():
data_object = table("data_object", column("workflow_type", sa.String))
for mapping in WORKFLOW_TYPE_MAP:
op.execute(
data_object.update()
.where(data_object.c.workflow_type == mapping["old"])
.values(workflow_type=mapping["new"])
)


def downgrade():
data_object = table("data_object", column("workflow_type", sa.String))
for mapping in WORKFLOW_TYPE_MAP:
op.execute(
data_object.update()
.where(data_object.c.workflow_type == mapping["new"])
.values(workflow_type=mapping["old"])
)

0 comments on commit e81b583

Please sign in to comment.