-
Notifications
You must be signed in to change notification settings - Fork 7
/
example-bs4.py
211 lines (184 loc) · 5.8 KB
/
example-bs4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import typing as T
import datetime
import requests
from bs4 import BeautifulSoup
from prefect import task, Flow, Parameter, unmapped
from prefect.engine import cache_validators
from prefect.engine.result_handlers import LocalResultHandler
from prefect.environments.storage import Docker
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
import sqlalchemy as sa
@task(
name="Create DB",
tags=['db']
)
def create_db(filename: T.Union[str, Parameter]) -> sa.Table:
"""
Specify the Schema of the output table
"""
meta = sa.MetaData(
bind=sa.create_engine(f"sqlite:///{filename}")
)
tbl = sa.Table(
'XFILES',
meta,
sa.Column(
'EPISODE',
sa.UnicodeText
),
sa.Column(
'CHARACTER',
sa.UnicodeText
),
sa.Column(
'TEXT',
sa.UnicodeText
)
)
tbl.create(checkfirst=True)
return tbl
@task
def insert_episode(episode: T.Tuple, tbl: sa.Table):
"""
Insert the data into the Database
"""
title, dialogue = episode
values = [
(title, *row)
for row in dialogue
]
stmt = tbl.insert().values(values)
with tbl.bind.begin() as conn:
rp = conn.execute(stmt)
return
@task
def create_episode_list(base_url, main_html, bypass):
"""
Given the main page html, creates a list of episode URLs
"""
if bypass:
return [base_url]
main_page = BeautifulSoup(main_html, 'html.parser')
episodes = []
for link in main_page.find_all('a'):
url = link.get('href')
if 'transcrp/scrp' in (url or ''):
episodes.append(base_url + url)
return episodes
@task(
# max_retries=3,
# retry_delay=datetime.timedelta(minutes=5),
# cache_for=datetime.timedelta(minutes=10),
# cache_validator=cache_validators.all_inputs,
tags=["web"]
)
def retrieve_url(url):
"""
Given a URL (string), retrieves html and
returns the html as a string.
"""
html = requests.get(url)
if html.ok:
return html.text
else:
raise ValueError("{} could not be retrieved.".format(url))
@task
def scrape_dialogue(episode_html):
"""
Given a string of html representing an episode page,
returns a tuple of (title, [(character, text)]) of the
dialogue from that episode
"""
episode = BeautifulSoup(episode_html, 'html.parser')
title = episode.title.text.rstrip(' *').replace("'", "''")
convos = episode.find_all('b') or episode.find_all('span', {'class': 'char'})
dialogue = []
for item in convos:
who = item.text.rstrip(': ').rstrip(' *').replace("'", "''")
what = str(item.next_sibling).rstrip(' *').replace("'", "''")
dialogue.append((who, what))
return (title, dialogue)
with Flow(
name="xfiles",
schedule=Schedule(
clocks=[
# TODO: specify the schedule you want this to run, and with what parameters
# https://docs.prefect.io/core/concepts/schedules.html
CronClock(
cron='0 0 * * *',
parameter_defaults=dict(
url='http://www.insidethex.co.uk/'
)
),
]
),
storage=Docker(
# TODO: change to your docker registry:
# https://docs.prefect.io/cloud/recipes/configuring_storage.html
registry_url='szelenka',
# TODO: 'pin' the exact versions you used on your development machine
python_dependencies=[
'requests==2.23.0',
'beautifulsoup4==4.8.2',
'sqlalchemy==1.3.15'
],
),
# TODO: specify how you want to handle results
# https://docs.prefect.io/core/concepts/results.html#results-and-result-handlers
result_handler=LocalResultHandler()
) as flow:
_url = Parameter("url", default='http://www.insidethex.co.uk/')
_bypass = Parameter("bypass", default=False, required=False)
_db_file = Parameter("db_file", default='xfiles_db.sqlite', required=False)
# scrape the website
_home_page = retrieve_url(_url)
_episodes = create_episode_list(
base_url=_url,
main_html=_home_page,
bypass=_bypass
)
_episode = retrieve_url.map(_episodes)
_dialogue = scrape_dialogue.map(_episode)
# insert into SQLite table
_db = create_db(
filename=_db_file
)
_final = insert_episode.map(
episode=_dialogue,
tbl=unmapped(_db)
)
if __name__ == '__main__':
# debug the local execution of the flow
import sys
import argparse
from prefect.utilities.debug import raise_on_exception
# get any CLI arguments
parser = argparse.ArgumentParser()
parser.add_argument('--visualize', required=False, default=False)
parser.add_argument('--deploy', required=False, default=False)
p = parser.parse_args(sys.argv[1:])
if p.visualize:
# view the DAG
flow.visualize()
# execute the Flow manually, not on the schedule
with raise_on_exception():
if p.deploy:
# TODO: hack for https://github.com/PrefectHQ/prefect/issues/2165
flow.result_handler.dir = '/root/.prefect/results'
flow.register(
# TODO: specify the project_name on Prefect Cloud you're authenticated to
project_name="Sample Project Name",
build=True,
# TODO: specify any labels for Agents
labels=[
'sample-label'
]
)
else:
flow.run(
parameters=dict(
url='http://www.insidethex.co.uk/'
),
run_on_schedule=False
)