From 47a747e947fe87fda10f7d5d1470cb5349f7c4e8 Mon Sep 17 00:00:00 2001 From: Jane Date: Sun, 22 Sep 2024 12:55:10 -0700 Subject: [PATCH 1/5] Adding script to generate sqlite of dev_erd --- etl_scripts/generate_to_sqlite.py | 61 +++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 etl_scripts/generate_to_sqlite.py diff --git a/etl_scripts/generate_to_sqlite.py b/etl_scripts/generate_to_sqlite.py new file mode 100644 index 0000000..525eb7a --- /dev/null +++ b/etl_scripts/generate_to_sqlite.py @@ -0,0 +1,61 @@ +import sqlite3 +import psycopg2 +import etl_scripts.util as util + +# Connect to postgres db +pg_conn = util.connect_to_db() + +# Connect to sqlite db +sqlite_conn = sqlite3.connect('data_dump/PaliCanon.db') + +# Get all dev_erd tables from postgres db +pg_cur = pg_conn.cursor() +pg_cur.execute(""" + select table_name + from information_schema.tables + where table_schema = 'dev_erd' + """) +tables = pg_cur.fetchall() + +# Loop through all tables and copy data from postgres to sqlite +for table in tables: + table_name = table[0] + + # Create same table in sqlite + pg_cur.execute(f""" + select column_name, data_type + from information_schema.columns + where table_name = '{table_name}' + and table_schema = 'dev_erd' + """) + columns = pg_cur.fetchall() + + column_defs = [] + for column_name, data_type in columns: + sqlite_type = 'TEXT' + if data_type in ['integer', 'bigint']: + sqlite_type = 'INTEGER' + elif data_type in ['numeric', 'real', 'double precision']: + sqlite_type = 'REAL' + column_defs.append(f'{column_name} {sqlite_type}') + + create_table_sql = f'create table {table_name} ({', '.join(column_defs)})' + sqlite_conn.execute(create_table_sql) + + # Fetch data from postgres + pg_cur.execute(f'select * from dev_erd."{table_name}"') + rows = pg_cur.fetchall() + + # Insert data into sqlite + insert_sql = f""" + insert into {table_name} + values ({', '.join(['?' for _ in range(len(columns))])}) + """ + sqlite_conn.executemany(insert_sql, rows) + +sqlite_conn.commit() +pg_cur.close() +pg_conn.close() +sqlite_conn.close() + +print("Data transferred to SQLite") From 4d391846eb46e24a911a925cacff884588028915 Mon Sep 17 00:00:00 2001 From: Jane Date: Mon, 23 Sep 2024 17:37:55 -0700 Subject: [PATCH 2/5] Adding text content for stage tables html and sc_bilara from suttacentral/sc_data --- .gitmodules | 3 + etl_scripts/extract/arangodb_fetch.py | 15 +--- etl_scripts/load/arangodb_helpers.py | 12 +-- .../orchestration/extract_and_load_flow.py | 11 ++- etl_scripts/orchestration/main_flow.py | 9 +- etl_scripts/transform/mart/test.py | 89 +++++++++++++++++++ .../transform/stage/get_text_contents.py | 66 ++++++++++++++ etl_scripts/util.py | 11 ++- sc-data | 1 + suttacentral | 2 +- 10 files changed, 189 insertions(+), 30 deletions(-) create mode 100644 etl_scripts/transform/mart/test.py create mode 100644 etl_scripts/transform/stage/get_text_contents.py create mode 160000 sc-data diff --git a/.gitmodules b/.gitmodules index 5ba4470..66cde59 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "suttacentral"] path = suttacentral url = https://github.com/suttacentral/suttacentral.git +[submodule "sc-data"] + path = sc-data + url = https://github.com/suttacentral/sc-data diff --git a/etl_scripts/extract/arangodb_fetch.py b/etl_scripts/extract/arangodb_fetch.py index b19915c..dd4538d 100644 --- a/etl_scripts/extract/arangodb_fetch.py +++ b/etl_scripts/extract/arangodb_fetch.py @@ -1,8 +1,8 @@ +import os import gzip import json -import shutil import subprocess -from prefect import task, flow +from prefect import task @task(log_prints=True) def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dump') -> None: @@ -31,7 +31,7 @@ def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dum subprocess.run(arangodump_cmd, check=True) # Copy dump from container to project's data_dumps folder - local_output_path = '/Users/janekim/Developer/tipitaka_db/data_dump' + local_output_path = 'data_dump' docker_cp_cmd = [ "docker", "cp", f"sc-arangodb:{output_dir}", local_output_path ] @@ -80,12 +80,5 @@ def extract_gz_file(input_gz_path: str, collection: str) -> list: except Exception as e: print(f"Error extracting {input_gz_path}: {e}") return None - -if __name__ == "__main__": - html_in = 'data_dump/arangodb-dump/html_text_8a00c848c7b3360945795d3bc52ebe88.data.json.gz' - sc_bilara_in = 'data_dump/arangodb-dump/sc_bilara_texts_ede6cd7605f17ff53d131a783fb228e9.data.json.gz' - export_arangodb_data() - extract_gz_file(html_in) - extract_gz_file(sc_bilara_in) - print('successful') + \ No newline at end of file diff --git a/etl_scripts/load/arangodb_helpers.py b/etl_scripts/load/arangodb_helpers.py index 5f5a679..aa066a8 100644 --- a/etl_scripts/load/arangodb_helpers.py +++ b/etl_scripts/load/arangodb_helpers.py @@ -2,6 +2,7 @@ import subprocess import platform import time +import os from prefect import task, flow @task(log_prints=True) @@ -66,11 +67,10 @@ def start_sc_arangodb(): print(f"Error starting sc-arangodb service: {e}") @task(log_prints=True) -def pull_suttacentral_repo(): - """Pull latest suttacentral github repo""" - subprocess.run(["git", "checkout", "main"], cwd='suttacentral', check=True) - subprocess.run(["git", "pull", "origin", "main"], cwd='suttacentral', check=True) - print("SuttaCentral repository updated.") +def pull_submodules(): + """Update all submodules to their latest commits""" + subprocess.run(["git", "submodule", "update", "--remote", "--merge"], check=True) + print("All submodules updated.") @task(log_prints=True) def refresh_arangodb(): @@ -86,7 +86,7 @@ def refresh_arangodb(): start_suttacentral() # Load new data into ArangoDB - bash_script_path = '/Users/janekim/Developer/tipitaka_db/etl_scripts/util/run_suttacentral.sh' + bash_script_path = 'etl_scripts/util/run_suttacentral.sh' subprocess.run([bash_script_path], cwd='suttacentral', check=True, shell=True) print("Bash script executed successfully.") print("Waiting for Docker containers to initialize...") diff --git a/etl_scripts/orchestration/extract_and_load_flow.py b/etl_scripts/orchestration/extract_and_load_flow.py index d2887b9..ea6c278 100644 --- a/etl_scripts/orchestration/extract_and_load_flow.py +++ b/etl_scripts/orchestration/extract_and_load_flow.py @@ -1,5 +1,5 @@ from pathlib import Path -from prefect import task, flow +from prefect import flow from etl_scripts.util import connect_to_db from etl_scripts.extract import api_fetch, arangodb_fetch from etl_scripts.load.arangodb_helpers import start_suttacentral @@ -29,12 +29,10 @@ def extract_suttaplex_flow(schema: str, basket: str): conn.close() @flow(log_prints=True) -def extract_arangodb_flow(schema: str): +def extract_arangodb_flow(schema: str, collections): conn = connect_to_db() start_suttacentral() - dump_directory = Path('/Users/janekim/Developer/tipitaka_db/data_dump/arangodb-dump') - - collections = ['sc_bilara_texts', 'html_text', 'super_nav_details_edges'] + dump_directory = Path('data_dump/arangodb-dump') for collection in collections: # Find the file corresponding to the collection @@ -66,10 +64,11 @@ def extract_arangodb_flow(schema: str): @flow(log_prints=True) def extract_and_load_flow(): + collections = ['sc_bilara_texts', 'html_text', 'super_nav_details_edges'] schema = 'dev_raw' extract_suttaplex_flow(schema, 'sutta') extract_suttaplex_flow(schema, 'vinaya') extract_suttaplex_flow(schema, 'abhidhamma') - extract_arangodb_flow(schema) + extract_arangodb_flow(schema, collections) \ No newline at end of file diff --git a/etl_scripts/orchestration/main_flow.py b/etl_scripts/orchestration/main_flow.py index 9041b2e..968aa04 100644 --- a/etl_scripts/orchestration/main_flow.py +++ b/etl_scripts/orchestration/main_flow.py @@ -3,7 +3,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from prefect import flow -from etl_scripts.orchestration.extract_and_load_flow import extract_and_load_flow +from etl_scripts.orchestration.extract_and_load_flow import extract_and_load_flow, extract_arangodb_flow from etl_scripts.orchestration.transform_flow import transform_stage_flow, transform_mart_flow @flow(log_prints=True) @@ -17,5 +17,10 @@ def main_flow(): # Transform - Model transform_mart_flow() +@flow(log_prints=True) +def temp_flow(): + extract_arangodb_flow('dev_raw', ['text_contents']) + if __name__ == '__main__': - main_flow.serve(name="Pali Canon ETL") + temp_flow() + # main_flow.serve(name="Pali Canon ETL") diff --git a/etl_scripts/transform/mart/test.py b/etl_scripts/transform/mart/test.py new file mode 100644 index 0000000..d333759 --- /dev/null +++ b/etl_scripts/transform/mart/test.py @@ -0,0 +1,89 @@ +from psycopg2 import pool +import json +from bs4 import BeautifulSoup +from prefect import task, flow, unmapped +from prefect_dask import DaskTaskRunner +from etl_scripts.util import split_into_batches + +# Function to create the connection pool +def create_pool(): + return pool.SimpleConnectionPool( + minconn=1, + maxconn=10, + host='localhost', + port='5432', + dbname='pali_canon', + user='dbadmin', + password='root' + ) + +# Function to connect to the database +def connect_to_db(): + db_pool = create_pool() # Pool created each time within function scope + return db_pool.getconn(), db_pool + +# Task to clean file content +@task(log_prints=True) +def clean_text_content(file_path: str) -> str: + file_name = file_path.split('/')[-1] + file_type = file_name.split('.')[-1].lower() + + if file_type == 'html': + with open(file_path, 'r', encoding='utf-8') as file: + soup = BeautifulSoup(file, 'html.parser') + return soup.get_text(separator=' ') + + if file_type == 'json': + with open(file_path, 'r', encoding='utf-8') as file: + data = json.load(file) + return " ".join(value for value in data.values()).strip() + + raise ValueError(f'Unsupported file type: {file_type}') + +# Task to insert cleaned text into the database +@task(log_prints=True) +def insert_texts_from_files(translations_batch: list, schema: str, table_name: str) -> None: + conn, db_pool = connect_to_db() # Create new connection within task + cur = conn.cursor() + + try: + for translation in translations_batch: + _key, file_path = translation + if file_path: + text_content = clean_text_content(file_path) + + cur.execute(f""" + UPDATE {schema}."{table_name}" + SET text_content = %s + WHERE _key = %s + """, (text_content, _key)) + + conn.commit() + finally: + cur.close() + db_pool.putconn(conn) # Ensure connection is released back to pool + +# Main flow to run the translation updates +@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 2, "processes": False})) +def update_translations_in_parallel(schema, table_name, batch_size=3000): + conn, db_pool = connect_to_db() # Create connection to fetch initial data + cur = conn.cursor() + + cur.execute(f""" + SELECT _key, local_file_path + FROM {schema}."{table_name}" + """) + translations = cur.fetchall() + + cur.close() + db_pool.putconn(conn) # Release connection after fetching + + # Split into smaller batches + batches = list(split_into_batches(translations, batch_size)) + + # Map the insert task over the batches + insert_texts_from_files.map(batches, unmapped(schema), unmapped(table_name)) + +# Main entry point +if __name__ == '__main__': + update_translations_in_parallel('dev_stage', 'html_text_arangodb') \ No newline at end of file diff --git a/etl_scripts/transform/stage/get_text_contents.py b/etl_scripts/transform/stage/get_text_contents.py new file mode 100644 index 0000000..9ba591e --- /dev/null +++ b/etl_scripts/transform/stage/get_text_contents.py @@ -0,0 +1,66 @@ +import json +import psycopg2 +from bs4 import BeautifulSoup +from prefect_dask import DaskTaskRunner +from prefect import task, flow, unmapped +from etl_scripts.util import connect_to_db, split_into_batches + +def clean_text_content(file_path: str) -> str: + file_name = file_path.split('/')[-1] + file_type = file_name.split('.')[-1].lower() + + if file_type == 'html': + with open(file_path, 'r', encoding='utf-8') as file: + soup = BeautifulSoup(file, 'html.parser') + text = soup.get_text(separator=' ') + return text + + if file_type == 'json': + with open(file_path, 'r', encoding='utf-8') as file: + data = json.load(file) + + # Concatenate all values from the json + text = " ".join(value for value in data.values()).strip() + return text + + raise ValueError(f'Unsupported file type: {file_type}') + +@task(log_prints=True) +def process_translations_batch(translations_batch: list, schema: str, table_name: str): + try: + conn = connect_to_db() + cur = conn.cursor() + for translation in translations_batch: + _key, file_path = translation + if file_path: + text_content = clean_text_content(file_path).strip() + cur.execute(f""" + UPDATE {schema}."{table_name}" + SET text_content = %s + WHERE _key = %s + """, (text_content, _key)) + conn.commit() + cur.close() + except Exception as e: + print(f"Error processing batch: {e}") + finally: + if conn: + conn.close() + +@flow(log_prints=True, task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 10})) +def update_translations_in_parallel(schema, table_name, batch_size=500): + conn = connect_to_db() + cur = conn.cursor() + cur.execute(f""" + SELECT _key, local_file_path + FROM {schema}."{table_name}" + """) + translations = cur.fetchall() + conn.close() + + batches = list(split_into_batches(translations, batch_size)) + process_translations_batch.map(batches, unmapped(schema), unmapped(table_name)) + +if __name__ == '__main__': + update_translations_in_parallel('dev_stage', 'html_text_arangodb') + \ No newline at end of file diff --git a/etl_scripts/util.py b/etl_scripts/util.py index 11014fb..57c4b79 100644 --- a/etl_scripts/util.py +++ b/etl_scripts/util.py @@ -1,8 +1,7 @@ import psycopg2 from prefect import task, flow -@task(log_prints=True) -def connect_to_db() -> psycopg2.connect: +def connect_to_db(): conn = psycopg2.connect( host='localhost', dbname='pali_canon', @@ -11,10 +10,9 @@ def connect_to_db() -> psycopg2.connect: ) return conn -@task(log_prints=True) def get_postgres_data(schema, table_name) -> list: sql = f"""select * - from pali_canon.{schema}.{table_name} + from pali_canon.{schema}."{table_name}" """ conn = connect_to_db() cur = conn.cursor() @@ -24,3 +22,8 @@ def get_postgres_data(schema, table_name) -> list: cur.close() return data + +def split_into_batches(data, batch_size): + """Splits data into batches of specified size.""" + for i in range(0,len(data), batch_size): + yield data[i:i + batch_size] \ No newline at end of file diff --git a/sc-data b/sc-data new file mode 160000 index 0000000..16688e4 --- /dev/null +++ b/sc-data @@ -0,0 +1 @@ +Subproject commit 16688e40b2fbb91b3286175e7931a07528b96d7d diff --git a/suttacentral b/suttacentral index d88e182..d0dad3a 160000 --- a/suttacentral +++ b/suttacentral @@ -1 +1 @@ -Subproject commit d88e182a138b23f67e1da070b57692b67ee04042 +Subproject commit d0dad3a8fd103f33bc6dd775b71405f1236a292a From c612739565da9f172aee17a434a7f696f20ad335 Mon Sep 17 00:00:00 2001 From: Jane Date: Mon, 23 Sep 2024 19:03:54 -0700 Subject: [PATCH 3/5] Integrate parallel text_content generation to main orchestartion --- etl_scripts/orchestration/main_flow.py | 3 +- etl_scripts/orchestration/transform_flow.py | 5 ++ etl_scripts/transform/mart/test.py | 89 ------------------- .../transform/stage/generate_hierarchy.py | 3 - .../transform/stage/get_text_contents.py | 32 ++++--- .../models/mart/erd/Translation.sql | 11 ++- .../models/stage/stage_html_text_arangodb.sql | 3 +- .../stage/stage_sc_bilara_texts_arangodb.sql | 3 +- .../models/test_stage/stage_test.sql | 13 --- pali_canon_dbt/translations_errors.log | 1 + prefect_flow.log | 0 11 files changed, 40 insertions(+), 123 deletions(-) delete mode 100644 etl_scripts/transform/mart/test.py delete mode 100644 pali_canon_dbt/models/test_stage/stage_test.sql create mode 100644 pali_canon_dbt/translations_errors.log create mode 100644 prefect_flow.log diff --git a/etl_scripts/orchestration/main_flow.py b/etl_scripts/orchestration/main_flow.py index 968aa04..f50f5d2 100644 --- a/etl_scripts/orchestration/main_flow.py +++ b/etl_scripts/orchestration/main_flow.py @@ -22,5 +22,4 @@ def temp_flow(): extract_arangodb_flow('dev_raw', ['text_contents']) if __name__ == '__main__': - temp_flow() - # main_flow.serve(name="Pali Canon ETL") + main_flow.serve(name="Pali Canon ETL") diff --git a/etl_scripts/orchestration/transform_flow.py b/etl_scripts/orchestration/transform_flow.py index a6ee1c4..2722b91 100644 --- a/etl_scripts/orchestration/transform_flow.py +++ b/etl_scripts/orchestration/transform_flow.py @@ -2,6 +2,7 @@ from prefect import flow from etl_scripts.util import get_postgres_data from etl_scripts.transform.stage.generate_hierarchy import preprocess_graph, insert_graph_to_postgres +from etl_scripts.transform.stage.get_text_contents import update_translations_in_parallel @flow(log_prints=True) def stage_load_hierarchy_table(): @@ -27,6 +28,10 @@ def transform_stage_flow(): run_dbt('stage') stage_load_hierarchy_table() + os.chdir('..') + update_translations_in_parallel('dev_stage', 'html_text_arangodb') + update_translations_in_parallel('dev_stage', 'sc_bilara_texts_arangodb') + @flow(log_prints=True) def transform_mart_flow(): run_dbt('mart.erd') diff --git a/etl_scripts/transform/mart/test.py b/etl_scripts/transform/mart/test.py deleted file mode 100644 index d333759..0000000 --- a/etl_scripts/transform/mart/test.py +++ /dev/null @@ -1,89 +0,0 @@ -from psycopg2 import pool -import json -from bs4 import BeautifulSoup -from prefect import task, flow, unmapped -from prefect_dask import DaskTaskRunner -from etl_scripts.util import split_into_batches - -# Function to create the connection pool -def create_pool(): - return pool.SimpleConnectionPool( - minconn=1, - maxconn=10, - host='localhost', - port='5432', - dbname='pali_canon', - user='dbadmin', - password='root' - ) - -# Function to connect to the database -def connect_to_db(): - db_pool = create_pool() # Pool created each time within function scope - return db_pool.getconn(), db_pool - -# Task to clean file content -@task(log_prints=True) -def clean_text_content(file_path: str) -> str: - file_name = file_path.split('/')[-1] - file_type = file_name.split('.')[-1].lower() - - if file_type == 'html': - with open(file_path, 'r', encoding='utf-8') as file: - soup = BeautifulSoup(file, 'html.parser') - return soup.get_text(separator=' ') - - if file_type == 'json': - with open(file_path, 'r', encoding='utf-8') as file: - data = json.load(file) - return " ".join(value for value in data.values()).strip() - - raise ValueError(f'Unsupported file type: {file_type}') - -# Task to insert cleaned text into the database -@task(log_prints=True) -def insert_texts_from_files(translations_batch: list, schema: str, table_name: str) -> None: - conn, db_pool = connect_to_db() # Create new connection within task - cur = conn.cursor() - - try: - for translation in translations_batch: - _key, file_path = translation - if file_path: - text_content = clean_text_content(file_path) - - cur.execute(f""" - UPDATE {schema}."{table_name}" - SET text_content = %s - WHERE _key = %s - """, (text_content, _key)) - - conn.commit() - finally: - cur.close() - db_pool.putconn(conn) # Ensure connection is released back to pool - -# Main flow to run the translation updates -@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 2, "processes": False})) -def update_translations_in_parallel(schema, table_name, batch_size=3000): - conn, db_pool = connect_to_db() # Create connection to fetch initial data - cur = conn.cursor() - - cur.execute(f""" - SELECT _key, local_file_path - FROM {schema}."{table_name}" - """) - translations = cur.fetchall() - - cur.close() - db_pool.putconn(conn) # Release connection after fetching - - # Split into smaller batches - batches = list(split_into_batches(translations, batch_size)) - - # Map the insert task over the batches - insert_texts_from_files.map(batches, unmapped(schema), unmapped(table_name)) - -# Main entry point -if __name__ == '__main__': - update_translations_in_parallel('dev_stage', 'html_text_arangodb') \ No newline at end of file diff --git a/etl_scripts/transform/stage/generate_hierarchy.py b/etl_scripts/transform/stage/generate_hierarchy.py index 0227684..80c968b 100644 --- a/etl_scripts/transform/stage/generate_hierarchy.py +++ b/etl_scripts/transform/stage/generate_hierarchy.py @@ -1,9 +1,7 @@ import json -from prefect import task, flow from collections import defaultdict from etl_scripts.util import connect_to_db, get_postgres_data -@task(log_prints=True) def preprocess_graph(edges: list) -> json: """Generate preprocess graph dictionary structure s.t. keys are parent_uids and children are list of child_uids. @@ -23,7 +21,6 @@ def preprocess_graph(edges: list) -> json: return parent_to_child -@task(log_prints=True) def insert_graph_to_postgres(graph: dict) -> None: conn = connect_to_db() cur = conn.cursor() diff --git a/etl_scripts/transform/stage/get_text_contents.py b/etl_scripts/transform/stage/get_text_contents.py index 9ba591e..ea0076a 100644 --- a/etl_scripts/transform/stage/get_text_contents.py +++ b/etl_scripts/transform/stage/get_text_contents.py @@ -1,5 +1,7 @@ +import os +import sys import json -import psycopg2 +import logging from bs4 import BeautifulSoup from prefect_dask import DaskTaskRunner from prefect import task, flow, unmapped @@ -27,27 +29,37 @@ def clean_text_content(file_path: str) -> str: @task(log_prints=True) def process_translations_batch(translations_batch: list, schema: str, table_name: str): + conn = None try: conn = connect_to_db() cur = conn.cursor() for translation in translations_batch: _key, file_path = translation if file_path: - text_content = clean_text_content(file_path).strip() - cur.execute(f""" - UPDATE {schema}."{table_name}" - SET text_content = %s - WHERE _key = %s - """, (text_content, _key)) + if not os.path.exists(file_path): + print(f'File not found: {file_path}') + print(os.getcwd()) + break + try: + text_content = clean_text_content(file_path).strip() + cur.execute(f""" + UPDATE {schema}."{table_name}" + SET text_content = %s + WHERE _key = %s + """, (text_content, _key)) + except Exception as e: + print(f'Error processing translation {_key}: {e}') conn.commit() cur.close() except Exception as e: - print(f"Error processing batch: {e}") + print.error(f'Error processing batch: {e}') finally: if conn: conn.close() -@flow(log_prints=True, task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 10})) +@flow(log_prints=True, task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 1, + "threads_per_worker": 10, + "processes": False})) def update_translations_in_parallel(schema, table_name, batch_size=500): conn = connect_to_db() cur = conn.cursor() @@ -61,6 +73,4 @@ def update_translations_in_parallel(schema, table_name, batch_size=500): batches = list(split_into_batches(translations, batch_size)) process_translations_batch.map(batches, unmapped(schema), unmapped(table_name)) -if __name__ == '__main__': - update_translations_in_parallel('dev_stage', 'html_text_arangodb') \ No newline at end of file diff --git a/pali_canon_dbt/models/mart/erd/Translation.sql b/pali_canon_dbt/models/mart/erd/Translation.sql index d3a280e..ec08da2 100644 --- a/pali_canon_dbt/models/mart/erd/Translation.sql +++ b/pali_canon_dbt/models/mart/erd/Translation.sql @@ -10,7 +10,8 @@ with all_translations as ( t.lang, t.author_uid, coalesce(html.local_file_path, sc.local_file_path) as local_file_path, - 'sutta' as basket + 'sutta' as basket, + coalesce(html.text_content, sc.text_content) as text_content from {{ ref('stage_sutta_translations_suttaplex_sc') }} as t left join @@ -27,7 +28,9 @@ with all_translations as ( t.lang, t.author_uid, coalesce(html.local_file_path, sc.local_file_path) as local_file_path, - 'vinaya' as basket + 'vinaya' as basket, + coalesce(html.text_content, sc.text_content) as text_content + from {{ ref('stage_vinaya_translations_suttaplex_sc') }} as t left join @@ -44,7 +47,9 @@ with all_translations as ( t.lang, t.author_uid, coalesce(html.local_file_path, sc.local_file_path) as local_file_path, - 'abhidhamma' as basket + 'abhidhamma' as basket, + coalesce(html.text_content, sc.text_content) as text_content + from {{ ref('stage_abhidhamma_translations_suttaplex_sc') }} as t left join diff --git a/pali_canon_dbt/models/stage/stage_html_text_arangodb.sql b/pali_canon_dbt/models/stage/stage_html_text_arangodb.sql index d860c87..3a77f65 100644 --- a/pali_canon_dbt/models/stage/stage_html_text_arangodb.sql +++ b/pali_canon_dbt/models/stage/stage_html_text_arangodb.sql @@ -8,7 +8,8 @@ with transform_file_path as ( select *, - replace(file_path, '/opt/sc/sc-flask/sc-data/', 'sc-data/') as local_file_path + replace(file_path, '/opt/sc/sc-flask/sc-data/', 'sc-data/') as local_file_path, + null as text_content from {{ source('dev_raw', 'html_text_arangodb') }} ) diff --git a/pali_canon_dbt/models/stage/stage_sc_bilara_texts_arangodb.sql b/pali_canon_dbt/models/stage/stage_sc_bilara_texts_arangodb.sql index 9ea64ce..4fb1c87 100644 --- a/pali_canon_dbt/models/stage/stage_sc_bilara_texts_arangodb.sql +++ b/pali_canon_dbt/models/stage/stage_sc_bilara_texts_arangodb.sql @@ -8,7 +8,8 @@ with transform_file_path as ( select *, - replace(file_path, '/opt/sc/sc-flask/sc-data/', 'sc-data/') as local_file_path + replace(file_path, '/opt/sc/sc-flask/sc-data/', 'sc-data/') as local_file_path, + null as text_content from {{ source('dev_raw', 'sc_bilara_texts_arangodb') }} ) diff --git a/pali_canon_dbt/models/test_stage/stage_test.sql b/pali_canon_dbt/models/test_stage/stage_test.sql deleted file mode 100644 index 2ccccbe..0000000 --- a/pali_canon_dbt/models/test_stage/stage_test.sql +++ /dev/null @@ -1,13 +0,0 @@ -{{ config( - schema='stage', - alias='test', - materialized='table', - post_hook=[create_index("test", "uid")] -) }} - -with distinct_suttaplex as ( - select * - from {{ source('dev_raw', 'abhidhamma_suttaplex_sc') }} -) - -select * from distinct_suttaplex \ No newline at end of file diff --git a/pali_canon_dbt/translations_errors.log b/pali_canon_dbt/translations_errors.log new file mode 100644 index 0000000..a16cbe8 --- /dev/null +++ b/pali_canon_dbt/translations_errors.log @@ -0,0 +1 @@ +Error: [Errno 2] No such file or directory: 'sc-data/sc_bilara_data/translation/en/sujato/sutta/kn/thag/thag1.88_translation-en-sujato.json' diff --git a/prefect_flow.log b/prefect_flow.log new file mode 100644 index 0000000..e69de29 From ee1bd8ec5b2e38b51c8fc5f1f6384c8c9a283823 Mon Sep 17 00:00:00 2001 From: Jane Date: Tue, 24 Sep 2024 09:08:13 -0700 Subject: [PATCH 4/5] Refractor - renaming vars, generating docstrings --- etl_scripts/extract/api_fetch.py | 4 - etl_scripts/extract/arangodb_fetch.py | 3 - etl_scripts/generate_to_sqlite.py | 116 ++++++++++-------- etl_scripts/load/arangodb_helpers.py | 15 +-- etl_scripts/load/load.py | 6 +- .../orchestration/extract_and_load_flow.py | 18 ++- etl_scripts/orchestration/main_flow.py | 4 - etl_scripts/orchestration/transform_flow.py | 11 +- .../transform/stage/generate_hierarchy.py | 12 +- .../transform/stage/get_text_contents.py | 33 ++++- etl_scripts/util.py | 31 ++++- 11 files changed, 162 insertions(+), 91 deletions(-) diff --git a/etl_scripts/extract/api_fetch.py b/etl_scripts/extract/api_fetch.py index bdb0e57..5da14f0 100644 --- a/etl_scripts/extract/api_fetch.py +++ b/etl_scripts/extract/api_fetch.py @@ -1,8 +1,5 @@ import requests -import pandas as pd -from prefect import flow, task -@task(log_prints=True) def get_menu_data(uid: str) -> dict: """ Retrieves menu data from SuttaCentral API based on the provided uid. @@ -24,7 +21,6 @@ def get_menu_data(uid: str) -> dict: menu_data = None return menu_data -@task(log_prints=True) def get_suttaplex(basket: str) -> None: """ Retrieves suttaplex data from the SuttaCentral API for given basket. diff --git a/etl_scripts/extract/arangodb_fetch.py b/etl_scripts/extract/arangodb_fetch.py index dd4538d..84717fe 100644 --- a/etl_scripts/extract/arangodb_fetch.py +++ b/etl_scripts/extract/arangodb_fetch.py @@ -1,10 +1,8 @@ -import os import gzip import json import subprocess from prefect import task -@task(log_prints=True) def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dump') -> None: """Export arangodb data from arangodump command inside Docker container. @@ -43,7 +41,6 @@ def export_arangodb_data(collections: list, output_dir: str = '/tmp/arangodb-dum except subprocess.CalledProcessError as e: print(f'Error during ArangoDB data dump: {e}') -@task(log_prints=True) def extract_gz_file(input_gz_path: str, collection: str) -> list: """ Extract a .gz file in-memory and return the JSON content as a list of dictionaries. diff --git a/etl_scripts/generate_to_sqlite.py b/etl_scripts/generate_to_sqlite.py index 525eb7a..cf2a9e6 100644 --- a/etl_scripts/generate_to_sqlite.py +++ b/etl_scripts/generate_to_sqlite.py @@ -1,61 +1,73 @@ import sqlite3 -import psycopg2 import etl_scripts.util as util -# Connect to postgres db -pg_conn = util.connect_to_db() +def generate_to_sqlite(sqlite_filepath: str='data_dump/PaliCanon.db', + schema: str='dev_erd'): + """Copies schema (and it's tables) from PostgreSQL to a sqlite PaliCanon.db -# Connect to sqlite db -sqlite_conn = sqlite3.connect('data_dump/PaliCanon.db') + Args: + sqlite_filepath (str, optional): File path of target (sqlite db). + Defaults to 'data_dump/PaliCanon.db'. + schema (str, optional): Schema from PostreSQL db to be copied. + Defaults to 'dev_erd'. + """ + # Connect to postgres db + pg_conn = util.connect_to_db() -# Get all dev_erd tables from postgres db -pg_cur = pg_conn.cursor() -pg_cur.execute(""" - select table_name - from information_schema.tables - where table_schema = 'dev_erd' - """) -tables = pg_cur.fetchall() + # Connect to sqlite db + sqlite_conn = sqlite3.connect(sqlite_filepath) -# Loop through all tables and copy data from postgres to sqlite -for table in tables: - table_name = table[0] - - # Create same table in sqlite + # Get all schema tables from postgres db + pg_cur = pg_conn.cursor() pg_cur.execute(f""" - select column_name, data_type - from information_schema.columns - where table_name = '{table_name}' - and table_schema = 'dev_erd' - """) - columns = pg_cur.fetchall() - - column_defs = [] - for column_name, data_type in columns: - sqlite_type = 'TEXT' - if data_type in ['integer', 'bigint']: - sqlite_type = 'INTEGER' - elif data_type in ['numeric', 'real', 'double precision']: - sqlite_type = 'REAL' - column_defs.append(f'{column_name} {sqlite_type}') + select table_name + from information_schema.tables + where table_schema = {schema} + """) + tables = pg_cur.fetchall() + + # Loop through all tables and copy data from postgres to sqlite + for table in tables: + table_name = table[0] + + # Create same table in sqlite + pg_cur.execute(f""" + select column_name, data_type + from information_schema.columns + where table_name = '{table_name}' + and table_schema = '{schema}' + """) + columns = pg_cur.fetchall() + + column_defs = [] + for column_name, data_type in columns: + sqlite_type = 'TEXT' + if data_type in ['integer', 'bigint']: + sqlite_type = 'INTEGER' + elif data_type in ['numeric', 'real', 'double precision']: + sqlite_type = 'REAL' + column_defs.append(f'{column_name} {sqlite_type}') + + create_table_sql = f'create table {table_name} ({', '.join(column_defs)})' + sqlite_conn.execute(create_table_sql) - create_table_sql = f'create table {table_name} ({', '.join(column_defs)})' - sqlite_conn.execute(create_table_sql) - - # Fetch data from postgres - pg_cur.execute(f'select * from dev_erd."{table_name}"') - rows = pg_cur.fetchall() - - # Insert data into sqlite - insert_sql = f""" - insert into {table_name} - values ({', '.join(['?' for _ in range(len(columns))])}) - """ - sqlite_conn.executemany(insert_sql, rows) - -sqlite_conn.commit() -pg_cur.close() -pg_conn.close() -sqlite_conn.close() + # Fetch data from postgres + pg_cur.execute(f'select * from {schema}."{table_name}"') + rows = pg_cur.fetchall() + + # Insert data into sqlite + insert_sql = f""" + insert into {table_name} + values ({', '.join(['?' for _ in range(len(columns))])}) + """ + sqlite_conn.executemany(insert_sql, rows) + + sqlite_conn.commit() + pg_cur.close() + pg_conn.close() + sqlite_conn.close() + + print("Data transferred to SQLite") -print("Data transferred to SQLite") +if __name__ == '__main__': + generate_to_sqlite() \ No newline at end of file diff --git a/etl_scripts/load/arangodb_helpers.py b/etl_scripts/load/arangodb_helpers.py index aa066a8..a60711e 100644 --- a/etl_scripts/load/arangodb_helpers.py +++ b/etl_scripts/load/arangodb_helpers.py @@ -5,7 +5,6 @@ import os from prefect import task, flow -@task(log_prints=True) def is_docker_running(): """Check if Docker daemon is running.""" try: @@ -14,7 +13,6 @@ def is_docker_running(): except subprocess.CalledProcessError: return False -@task(log_prints=True) def start_docker(): """Start Docker depending on the OS.""" system = platform.system() @@ -41,8 +39,8 @@ def start_docker(): print(f"Error starting Docker: {e}") return False -def start_suttacentral(): - """Start suttacentral service.""" +def start_suttacentral_docker(): + """Start suttacentral Docker service.""" if not is_docker_running(): start_docker() @@ -53,8 +51,8 @@ def start_suttacentral(): except subprocess.CalledProcessError as e: print(f"Error starting suttacentral service: {e}") -@task(log_prints=True) def start_sc_arangodb(): + """Start sc_arangodb service""" if not is_docker_running(): start_docker() @@ -66,13 +64,11 @@ def start_sc_arangodb(): except subprocess.CalledProcessError as e: print(f"Error starting sc-arangodb service: {e}") -@task(log_prints=True) def pull_submodules(): """Update all submodules to their latest commits""" subprocess.run(["git", "submodule", "update", "--remote", "--merge"], check=True) print("All submodules updated.") -@task(log_prints=True) def refresh_arangodb(): """Pull the latest updates and refresh ArangoDB.""" try: @@ -83,7 +79,7 @@ def refresh_arangodb(): return # Start the suttacentral service if it's not running - start_suttacentral() + start_suttacentral_docker() # Load new data into ArangoDB bash_script_path = 'etl_scripts/util/run_suttacentral.sh' @@ -99,6 +95,3 @@ def refresh_arangodb(): except subprocess.CalledProcessError as e: print(f"Error during data refresh: {e}") - -if __name__ == "__main__": - refresh_arangodb() \ No newline at end of file diff --git a/etl_scripts/load/load.py b/etl_scripts/load/load.py index 50a27b8..d8c9036 100644 --- a/etl_scripts/load/load.py +++ b/etl_scripts/load/load.py @@ -1,13 +1,13 @@ import json from prefect import task -@task(log_prints=True) -def generate_create_sql(json_data, schema, table_name) -> str: +def generate_create_sql(json_data: json, schema: str, table_name: str) -> str: """ Generates a CREATE TABLE statement dynamically from all keys in the given JSON data. Args: json_data (json): JSON file from API get request + schema (string): Name of schema target table should be placed in table_name (string): Name of target table in Postgres Returns: @@ -40,13 +40,13 @@ def generate_create_sql(json_data, schema, table_name) -> str: return sql -@task(log_prints=True) def insert_to_db(data, conn, schema, table_name) -> None: """Inserts data into PostgreSQL Args: data (json): Data to be ingested conn (psycopg2 object): Connection to data warehouse + schema (string): Name of schema target table should be placed in table_name (string): Target table name """ with conn.cursor() as cur: diff --git a/etl_scripts/orchestration/extract_and_load_flow.py b/etl_scripts/orchestration/extract_and_load_flow.py index ea6c278..4a0bdae 100644 --- a/etl_scripts/orchestration/extract_and_load_flow.py +++ b/etl_scripts/orchestration/extract_and_load_flow.py @@ -2,12 +2,18 @@ from prefect import flow from etl_scripts.util import connect_to_db from etl_scripts.extract import api_fetch, arangodb_fetch -from etl_scripts.load.arangodb_helpers import start_suttacentral +from etl_scripts.load.arangodb_helpers import start_suttacentral_docker from etl_scripts.load.load import generate_create_sql, insert_to_db @flow(log_prints=True) def extract_suttaplex_flow(schema: str, basket: str): + """Extract suttaplex data from SuttaCentral API. + + Args: + schema (str): PostgreSQL schema in which data should be placed + basket (str): Pali canon basket to be extracted + """ conn = connect_to_db() json_data = api_fetch.get_suttaplex(basket) @@ -30,8 +36,14 @@ def extract_suttaplex_flow(schema: str, basket: str): @flow(log_prints=True) def extract_arangodb_flow(schema: str, collections): + """Extract data from arangodb backend db. + + Args: + schema (str): PostgreSQL schema in which data should be placed + collections (str): Name of arangodb collection to be extracted + """ conn = connect_to_db() - start_suttacentral() + start_suttacentral_docker() dump_directory = Path('data_dump/arangodb-dump') for collection in collections: @@ -64,6 +76,8 @@ def extract_arangodb_flow(schema: str, collections): @flow(log_prints=True) def extract_and_load_flow(): + """Flow for extraction and loading of dev_raw tables + """ collections = ['sc_bilara_texts', 'html_text', 'super_nav_details_edges'] schema = 'dev_raw' extract_suttaplex_flow(schema, 'sutta') diff --git a/etl_scripts/orchestration/main_flow.py b/etl_scripts/orchestration/main_flow.py index f50f5d2..5a5b33b 100644 --- a/etl_scripts/orchestration/main_flow.py +++ b/etl_scripts/orchestration/main_flow.py @@ -17,9 +17,5 @@ def main_flow(): # Transform - Model transform_mart_flow() -@flow(log_prints=True) -def temp_flow(): - extract_arangodb_flow('dev_raw', ['text_contents']) - if __name__ == '__main__': main_flow.serve(name="Pali Canon ETL") diff --git a/etl_scripts/orchestration/transform_flow.py b/etl_scripts/orchestration/transform_flow.py index 2722b91..cd85551 100644 --- a/etl_scripts/orchestration/transform_flow.py +++ b/etl_scripts/orchestration/transform_flow.py @@ -6,14 +6,17 @@ @flow(log_prints=True) def stage_load_hierarchy_table(): + """Create and ingest hierarchical table to dev_stage + """ edges = get_postgres_data('dev_raw', 'super_nav_details_edges_arangodb') graph = preprocess_graph(edges) insert_graph_to_postgres(graph) print('graph_table created') - @flow(log_prints=True) -def run_dbt(dir: str): +def run_dbt(): + """Run dbt process to create tables in PostgreSQL. + """ project_dir = 'pali_canon_dbt' print('Current working directory:', os.getcwd()) @@ -25,6 +28,8 @@ def run_dbt(dir: str): @flow(log_prints=True) def transform_stage_flow(): + """Flow to run dbt and transform data to create stage tables + """ run_dbt('stage') stage_load_hierarchy_table() @@ -34,5 +39,7 @@ def transform_stage_flow(): @flow(log_prints=True) def transform_mart_flow(): + """Flow to run dbt for mart tables. + """ run_dbt('mart.erd') \ No newline at end of file diff --git a/etl_scripts/transform/stage/generate_hierarchy.py b/etl_scripts/transform/stage/generate_hierarchy.py index 80c968b..d5fdac2 100644 --- a/etl_scripts/transform/stage/generate_hierarchy.py +++ b/etl_scripts/transform/stage/generate_hierarchy.py @@ -1,13 +1,16 @@ import json from collections import defaultdict -from etl_scripts.util import connect_to_db, get_postgres_data +from etl_scripts.util import connect_to_db def preprocess_graph(edges: list) -> json: """Generate preprocess graph dictionary structure s.t. keys are parent_uids and children are list of child_uids. + + Args: + edges: data from a Postgresql table Returns: - json: The graph dictionary of parent and children uids + json: The dictionary of parent_uid: list[children_uids] """ parent_to_child = defaultdict(list) child_uids = set() @@ -22,6 +25,11 @@ def preprocess_graph(edges: list) -> json: return parent_to_child def insert_graph_to_postgres(graph: dict) -> None: + """Insert hierarichal data to a table in PostgreSQL + + Args: + graph (dict): The dictionary of parent_uid: list[children_uids] + """ conn = connect_to_db() cur = conn.cursor() diff --git a/etl_scripts/transform/stage/get_text_contents.py b/etl_scripts/transform/stage/get_text_contents.py index ea0076a..c8d2dcf 100644 --- a/etl_scripts/transform/stage/get_text_contents.py +++ b/etl_scripts/transform/stage/get_text_contents.py @@ -1,13 +1,22 @@ import os -import sys import json -import logging from bs4 import BeautifulSoup from prefect_dask import DaskTaskRunner from prefect import task, flow, unmapped from etl_scripts.util import connect_to_db, split_into_batches def clean_text_content(file_path: str) -> str: + """Removes unnecessary characters in .json or .html files. + + Args: + file_path (str): File path of the file + + Raises: + ValueError: Raised if file that is not .json or .html is the input + + Returns: + str: Cleaned text + """ file_name = file_path.split('/')[-1] file_type = file_name.split('.')[-1].lower() @@ -29,6 +38,15 @@ def clean_text_content(file_path: str) -> str: @task(log_prints=True) def process_translations_batch(translations_batch: list, schema: str, table_name: str): + """Process translations by batch. Ingests textual content to table_name via + a file_path provided by table_name from PostgreSQL db. + + Args: + translations_batch (list): Each element is a list of _key and file_path + schema (str): Schema of table_name + table_name (str): Table name from schema. Is the source of translations and + target of text_content + """ conn = None try: conn = connect_to_db() @@ -54,13 +72,22 @@ def process_translations_batch(translations_batch: list, schema: str, table_name except Exception as e: print.error(f'Error processing batch: {e}') finally: + # Always close connection at the end of batch processing. if conn: conn.close() @flow(log_prints=True, task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 1, "threads_per_worker": 10, "processes": False})) -def update_translations_in_parallel(schema, table_name, batch_size=500): +def update_translations_in_parallel(schema: str, table_name: str, batch_size=500): + """Concurrently processes all batches of translations. + + Args: + schema (str): Schema of table_name + table_name (str): Table name from schema. Is the source of translations and + target of text_content + batch_size (int, optional): Batch size of translations. Defaults to 500. + """ conn = connect_to_db() cur = conn.cursor() cur.execute(f""" diff --git a/etl_scripts/util.py b/etl_scripts/util.py index 57c4b79..87f602c 100644 --- a/etl_scripts/util.py +++ b/etl_scripts/util.py @@ -1,7 +1,11 @@ import psycopg2 -from prefect import task, flow -def connect_to_db(): +def connect_to_db() -> psycopg2.connect: + """Connects to PostgreSQL db pali_canon. + + Returns: + psycopg2.connect: The db connection + """ conn = psycopg2.connect( host='localhost', dbname='pali_canon', @@ -10,7 +14,17 @@ def connect_to_db(): ) return conn -def get_postgres_data(schema, table_name) -> list: +def get_postgres_data(schema: str, table_name: str) -> list: + """Returns data from PostgreSQL table. + + Args: + schema (str): Schema of table_name + table_name (str): Name of the table we are fetching + data from + + Returns: + list: Data from schema.table_name + """ sql = f"""select * from pali_canon.{schema}."{table_name}" """ @@ -23,7 +37,14 @@ def get_postgres_data(schema, table_name) -> list: return data -def split_into_batches(data, batch_size): - """Splits data into batches of specified size.""" +def split_into_batches(data: list, batch_size: int) -> list: + """Splits data into batches of specified size. + + Args: + data (list): List to be separated into batches + batch_size (int): Size of each batch + Yields: + list: The list batch + """ for i in range(0,len(data), batch_size): yield data[i:i + batch_size] \ No newline at end of file From 3eab7132f3b2fd9f7de73d06e9964738423970f8 Mon Sep 17 00:00:00 2001 From: Jane Date: Tue, 24 Sep 2024 09:22:01 -0700 Subject: [PATCH 5/5] Delete init files --- etl_scripts/__init__.py | 0 etl_scripts/extract/__init__.py | 0 etl_scripts/load/__init__.py | 0 etl_scripts/orchestration/__init__.py | 0 etl_scripts/orchestration/transform_flow.py | 5 ++++- etl_scripts/transform/__init__.py | 0 etl_scripts/transform/stage/__init__.py | 0 7 files changed, 4 insertions(+), 1 deletion(-) delete mode 100644 etl_scripts/__init__.py delete mode 100644 etl_scripts/extract/__init__.py delete mode 100644 etl_scripts/load/__init__.py delete mode 100644 etl_scripts/orchestration/__init__.py delete mode 100644 etl_scripts/transform/__init__.py delete mode 100644 etl_scripts/transform/stage/__init__.py diff --git a/etl_scripts/__init__.py b/etl_scripts/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/etl_scripts/extract/__init__.py b/etl_scripts/extract/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/etl_scripts/load/__init__.py b/etl_scripts/load/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/etl_scripts/orchestration/__init__.py b/etl_scripts/orchestration/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/etl_scripts/orchestration/transform_flow.py b/etl_scripts/orchestration/transform_flow.py index cd85551..8542320 100644 --- a/etl_scripts/orchestration/transform_flow.py +++ b/etl_scripts/orchestration/transform_flow.py @@ -14,8 +14,11 @@ def stage_load_hierarchy_table(): print('graph_table created') @flow(log_prints=True) -def run_dbt(): +def run_dbt(dir: str): """Run dbt process to create tables in PostgreSQL. + + Args: + dir (str): Directory to be run """ project_dir = 'pali_canon_dbt' diff --git a/etl_scripts/transform/__init__.py b/etl_scripts/transform/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/etl_scripts/transform/stage/__init__.py b/etl_scripts/transform/stage/__init__.py deleted file mode 100644 index e69de29..0000000