Skip to content
This repository has been archived by the owner on Jan 26, 2021. It is now read-only.

Implemented ETL for covidtracking.com and rt.live data sources to a c… #37

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions covid/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,10 @@ def extract_cdc_beds_historical_data(credentials):
cdc_historical_df = cdc_historical_df.set_index(STATE_FIELD)

return cdc_historical_df


def extract_rt_live_data():
rt_live_url = "https://d14wlfuexuxgcm.cloudfront.net/covid/rt.csv"
rt_live_df = pd.read_csv(rt_live_url)

return rt_live_df
25 changes: 24 additions & 1 deletion covid/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@
]


def transform_covidtracking_data(covidtracking_df):
def transform_covidtracking_data_to_cdc(covidtracking_df):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I'm glad this is renamed! Nit, maybe rename it as ...to_cdc_criteria, since that's a bit more accurate about what's happening.

"""Transforms data from https://covidtracking.com/ and calculates CDC Criteria 1 (A, B, C, D) and 2 (A, B, C, D)."""
# Rename state field into column called "State" instead of "state".
covidtracking_df = covidtracking_df.rename(
Expand Down Expand Up @@ -944,6 +944,20 @@ def transform_covidtracking_data(covidtracking_df):
return covidtracking_df


def transform_covidtracking_data_to_states_historical(covidtracking_df):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I'm glad these are methods now.

"""Transforms data from https://covidtracking.com/ to format for homepage sheet by adding key column"""
covidtracking_historical_df = covidtracking_df.copy()
# key is the concatenation of numerical date with state name
covidtracking_historical_df.insert(0, 'key', covidtracking_historical_df.apply(
lambda row: str(row['date']) + str(row['state']), axis=1))
return covidtracking_historical_df


def transform_covidtracking_data_to_states_current(covidtracking_df):
"""Transforms data from https://covidtracking.com/ to format for homepage sheet by dropping date column"""
return covidtracking_df.drop('date', axis=1)


def transform_cdc_ili_data(ili_df):
"""Transforms data from https://gis.cdc.gov/grasp/fluview/fluportaldashboard.html and calculates CDC Criteria 5
(A, B, C).
Expand Down Expand Up @@ -1228,3 +1242,12 @@ def indication_of_rebound(series_):
indicator = "Rebound"

return indicator


def transform_rtlive_data(rtlive_df):
"""Transforms data from rt.live to format for homepage sheet by adding index column"""
transformed_rtlive_df = rtlive_df.copy()
# key is the numerical date concatenated with region name
transformed_rtlive_df.insert(0, 'key', transformed_rtlive_df.apply(
lambda row: str(row['date']).replace('-', '') + str(row['region']), axis=1))
return transformed_rtlive_df
64 changes: 54 additions & 10 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from covid.constants import PATH_TO_SERVICE_ACCOUNT_KEY
from covid.extract import extract_cdc_ili_data
from covid.extract import extract_covidtracking_historical_data
from covid.extract import extract_covidtracking_historical_data, extract_covidtracking_current_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind install pre-commit and running it according to the instructions i just added? https://github.com/usdigitalresponse/covid-exit-strategy/pull/42/files. Not your fault, this wasn't documented yet! But it'll move these to single-line imports.

from covid.extract import extract_rt_live_data
from covid.load import get_sheets_client
from covid.load import post_dataframe_to_google_sheets
from covid.load_utils import sleep_and_log
Expand All @@ -18,7 +19,9 @@
from covid.transform import STATE_FIELD
from covid.transform import STATE_SUMMARY_COLUMNS
from covid.transform import transform_cdc_ili_data
from covid.transform import transform_covidtracking_data
from covid.transform import transform_covidtracking_data_to_cdc, transform_covidtracking_data_to_states_historical, \
transform_covidtracking_data_to_states_current
from covid.transform import transform_rtlive_data
from covid.transform_utils import calculate_state_summary

# Define the names of the tabs to upload to.
Expand All @@ -37,6 +40,11 @@
"1aHvKgCfyIlWYHgBSE26cPd5jE0yZYgctcxniyZfWpu8"
)

HOMEPAGE_SHEET_GOOGLE_WORKBOOK_KEY = "1CKcRpPqYzWo-B_xEhrSyrFDplMxhf9PaGTlpgh91jeQ"
HOME_SHEET_COVIDTRACKING_STATE_HISTORICAL_TAB_NAME = 'covidtracking.com - states - history - daily - csv'
HOME_SHEET_COVIDTRACKING_STATE_CURRENT_TAB_NAME = 'covidtracking.com - states - current - csv'
HOME_SHEET_RT_LIVE_TAB_NAME = 'rt.live - csv'

# Note: if you'd like to run the full pipeline, you'll need to generate a service account keyfile for an account
# that has been given write access to the Google Sheet.

Expand Down Expand Up @@ -86,20 +94,28 @@ def extract_transform_and_load_covid_data(post_to_google_sheets=True):
# credentials=credentials,
# )

covidtracking_df = extract_covidtracking_historical_data()
historical_covidtracking_df = extract_covidtracking_historical_data()
current_covidtracking_df = extract_covidtracking_current_data()
cdc_ili_df = extract_cdc_ili_data()
rtlive_df = extract_rt_live_data()

transformed_cdc_ili_df = transform_cdc_ili_data(ili_df=cdc_ili_df)

transformed_covidtracking_df = transform_covidtracking_data(
covidtracking_df=covidtracking_df
transformed_covidtracking_cdc_df = transform_covidtracking_data_to_cdc(
covidtracking_df=historical_covidtracking_df
)
transformed_covidtracking_historical_df = transform_covidtracking_data_to_states_historical(
covidtracking_df=historical_covidtracking_df)
transformed_covidtracking_current_df = transform_covidtracking_data_to_states_current(
covidtracking_df=current_covidtracking_df)

transformed_rtlive_df = transform_rtlive_data(rtlive_df=rtlive_df)

# Upload summary for all states.
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=calculate_state_summary(
transformed_df=transformed_covidtracking_df,
transformed_df=transformed_covidtracking_cdc_df,
columns=STATE_SUMMARY_COLUMNS,
),
workbook_key=CDC_GUIDANCE_GOOGLE_WORKBOOK_KEY,
Expand All @@ -112,7 +128,7 @@ def extract_transform_and_load_covid_data(post_to_google_sheets=True):

# Upload Criteria 1 workbook for all states.
criteria_1_summary_df = calculate_state_summary(
transformed_df=transformed_covidtracking_df, columns=CRITERIA_1_SUMMARY_COLUMNS
transformed_df=transformed_covidtracking_cdc_df, columns=CRITERIA_1_SUMMARY_COLUMNS
)

if post_to_google_sheets:
Expand All @@ -127,7 +143,7 @@ def extract_transform_and_load_covid_data(post_to_google_sheets=True):

# Upload Criteria 2 workbook for all states.
criteria_2_summary_df = calculate_state_summary(
transformed_df=transformed_covidtracking_df, columns=CRITERIA_2_SUMMARY_COLUMNS
transformed_df=transformed_covidtracking_cdc_df, columns=CRITERIA_2_SUMMARY_COLUMNS
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
Expand Down Expand Up @@ -167,7 +183,7 @@ def extract_transform_and_load_covid_data(post_to_google_sheets=True):

# Upload state summary tab for Criteria 6.
criteria_6_summary_df = calculate_state_summary(
transformed_df=transformed_covidtracking_df, columns=CRITERIA_6_SUMMARY_COLUMNS
transformed_df=transformed_covidtracking_cdc_df, columns=CRITERIA_6_SUMMARY_COLUMNS
)
if post_to_google_sheets:
post_dataframe_to_google_sheets(
Expand Down Expand Up @@ -208,12 +224,40 @@ def extract_transform_and_load_covid_data(post_to_google_sheets=True):

# Upload data for all states.
post_dataframe_to_google_sheets(
df=transformed_covidtracking_df,
df=transformed_covidtracking_cdc_df,
workbook_key=CDC_GUIDANCE_GOOGLE_WORKBOOK_KEY,
tab_name=ALL_STATE_DATA_TAB_NAME,
credentials=credentials,
)

# Upload transformed covidtracking.com historical + current data, rt.live data to respective tabs of sheet
# with data for home page.
if post_to_google_sheets:
post_dataframe_to_google_sheets(
df=transformed_covidtracking_historical_df,
workbook_key=HOMEPAGE_SHEET_GOOGLE_WORKBOOK_KEY,
tab_name=HOME_SHEET_COVIDTRACKING_STATE_HISTORICAL_TAB_NAME,
credentials=credentials,
)

sleep_and_log()

post_dataframe_to_google_sheets(
df=transformed_covidtracking_current_df,
workbook_key=HOMEPAGE_SHEET_GOOGLE_WORKBOOK_KEY,
tab_name=HOME_SHEET_COVIDTRACKING_STATE_CURRENT_TAB_NAME,
credentials=credentials
)

sleep_and_log()

post_dataframe_to_google_sheets(
df=transformed_rtlive_df,
workbook_key=HOMEPAGE_SHEET_GOOGLE_WORKBOOK_KEY,
tab_name=HOME_SHEET_RT_LIVE_TAB_NAME,
credentials=credentials,
)


if __name__ == "__main__":
# Note: for faster debugging during development, you can set `post_to_google_sheets` to `False`.
Expand Down