-
Notifications
You must be signed in to change notification settings - Fork 0
/
find_completed_runs_dag.py
157 lines (138 loc) · 5.84 KB
/
find_completed_runs_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import sequencer
import os
import datetime
import json
import time
from SampleSheet import SampleSheet
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.models import Variable
from airflow.decorators import task
from airflow.operators.email_operator import EmailOperator
# defines the list of all sequencers and for each sequencer 1) name 2) location it writes runs to and 3) the last file the sequencer writes when a run is completed to signal demux can begin
sequencers = {
"sequencers": [
{
"name": "ayyan",
"path": "/igo/sequencers/ayyan",
"last_file": "CopyComplete.txt"
},
{
"name": "fauci",
"path": "/igo/sequencers/fauci",
"last_file": "CopyComplete.txt"
},
{
"name": "diana",
"path": "/igo/sequencers/diana",
"last_file": "CopyComplete.txt"
},
{
"name": "fauci",
"path": "/igo/nfsfauci/fauci",
"last_file": "CopyComplete.txt"
},
{
"name": "ruth",
"path": "/igo/sequencers/ruth",
"last_file": "CopyComplete.txt"
},
{
"name": "johnsawyers",
"path": "/igo/sequencers/johnsawyers",
"last_file": "CopyComplete.txt"
},
{
"name": "pepe",
"path": "/igo/sequencers/pepe/output",
"last_file": "CopyComplete.txt"
},
{
"name": "amelie",
"path": "/igo/sequencers/amelie/output",
"last_file": "CopyComplete.txt"
},
]
}
"""
Find recently completed runs by looking for the last file written by the sequencers,
then split and copy the sample sheet for the completed run and launch the demux task
"""
with DAG(
dag_id='find_completed_runs',
schedule_interval='@hourly',
start_date=datetime.datetime(2022, 1, 1),
catchup=False,
tags=["find_completed_runs"],
) as dag:
completed_runs_path = list()
# TODO - Consider making separate DAG to trigger 1 specific demux?
demux_special = "/igo/sequencers/run_to_demux.txt"
if os.path.exists(demux_special):
run_to_demux_file = open("/igo/sequencers/run_to_demux.txt", "r")
demux = run_to_demux_file.readline().strip()
run_to_demux_file.close()
if len(demux) > 0:
completed_runs_path.append(demux + "/RTAComplete.txt")
# TODO check if path exists, if not then done
if len(completed_runs_path) == 0:
print("Processing sequencer list: {}".format(sequencers))
# should match schedule_interval above
time_to_search = Variable.get("completed_run_search_interval_mins", default_var=60)
print("Searching for runs completed in the last {} minutes".format(time_to_search))
completed_runs_path = sequencer.find_completed_runs(sequencers, time_to_search)
for run_path in completed_runs_path:
# remove file name from path, for example: /igo/sequencers/michelle/211129_MICHELLE_0461_AHMJFJDSX2/CopyComplete.txt
completed_run_path = os.path.dirname(run_path)
print("Copying sample sheet(s) for completed run:" + completed_run_path)
# ex: 211129_MICHELLE_0461_AHMJFJDSX2
completed_run = str(os.path.basename(completed_run_path))
run_name_only = completed_run[7:] # ex: MICHELLE_0461_AHMJFJDSX2
samplesheet = "SampleSheet_" + completed_run + ".csv"
orig_samplesheet_dir = Variable.get("original_samplesheet_dir", default_var="/rtssdc/mohibullahlab/LIMS/LIMS_SampleSheets/")
orig_samplesheet = orig_samplesheet_dir + samplesheet
dest_samplesheet_dir = Variable.get("destination_samplesheet_dir", default_var="/igo/work/igo/SampleSheetCopies/")
dest_samplesheet = dest_samplesheet_dir + samplesheet
print("Reading the LIMS sample sheet {}".format(orig_samplesheet))
ss_orig = SampleSheet(orig_samplesheet)
ss_orig.path = dest_samplesheet
ss_list = ss_orig.split_sample_sheet()
ss_list_str = "\n" # format string to be readable in Data Team emails
for sheet in ss_list:
ss_list_str += sheet.path + "\n"
email_to = Variable.get("email_to", default_var="skigodata@mskcc.org")
send_demux_email = EmailOperator(
task_id='send_demux_email'+run_name_only,
to=email_to,
subject='IGO Cluster New Run Sent for Demuxing',
html_content="<h3>{}</h3> sent to DRAGEN split into: {} ".format(run_name_only, ss_list_str),
dag=dag
)
counter = 0
for samplesheet in ss_list:
counter = counter + 1
print("Saving sample sheet {}".format(samplesheet.path))
samplesheet.write_csv()
demux_dict = {}
demux_dict['dragen_demux'] = 'False'
demux_dict['samplesheet'] = samplesheet.path
demux_dict['sequencer_path'] = completed_run_path
demux_args_json = json.dumps(demux_dict)
exec_time = (datetime.datetime.now() + datetime.timedelta(seconds=(counter))).strftime("%Y-%m-%dT%H:%M:%SZ")
future = '"'+exec_time+'"'
# Airflow required arguments to trigger a dag - execution date and conf arguments
dag_json = '{"execution_date": ' + future + ',"conf": '+demux_args_json+'}'
print("Calling demux with execution time and args:" + dag_json)
trigger_dag_demux = SimpleHttpOperator(
task_id="demux_" +
str(os.path.basename(samplesheet.path)).replace(".csv", "").replace("SampleSheet_", ""),
http_conn_id='airflow-api',
endpoint='api/v1/dags/demux_run/dagRuns',
method='POST',
headers={'Content-Type': 'application/json'},
data=dag_json,
)
send_demux_email >> trigger_dag_demux
# Airflow can only call the demux endpoint once at a specific time, do not allow demux endpoint calls to overlap
time.sleep(5)