forked from OpsLevel-sales-demo/shopping_cart
-
Notifications
You must be signed in to change notification settings - Fork 2
/
code
359 lines (267 loc) · 14.9 KB
/
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import json, os, sys, requests, datetime, pandas, sqlalchemy, dotenv, time
import re
def get_domain(url):
terms = url.split(".")
if "com" in terms:
domain = ".".join(terms[terms.index("com")-1:])
else:
domain = ".".join(terms[-2:])
return domain
def sourcestack_data(table_name, result_type, primary_key, settings, engine):
sourcestack_params = settings["sourcestack_params"]
# get the data from sourcestack
r = requests.post(
f"https://sourcestack-api.com/{result_type}",
headers={"X-API-KEY": os.getenv("SOURCESTACK_KEY"), "Content-Type":"application/json"},
data=json.dumps(sourcestack_params)
)
if not r.ok:
print(f"Could not access Sourcestack: {r.json()}")
return None, False
# the search was only count, print that and terminate
if sourcestack_params["count_only"]:
print(f"The search has {r.json()['entry_count']} results. Run again with count False to access them.")
return None, True
# ensure data was received
try:
sourcestack_df = pandas.read_csv(r.json()['link'], quotechar='"', encoding='utf-8')
except Exception as e:
print(f"Sourcestack response did not include data: {e}")
return None, False
print("Got Sourcestack data.")
print(f"Sourcestack credits remaining: {r.headers['X-SOURCESTACK-CREDITS-REMAINING']}")
# temp backup sourcestack data to avoid using credits
# with open("sourcestack_test.json") as json_file:
# data_list = json.load(json_file)["data"]
# create MySQL table data
sourcestack_columns_list = list(sourcestack_df.columns)
table_name = f"{datetime.datetime.now(datetime.timezone.utc).strftime('%Y/%m/%d_%H:%M:%S')}_{result_type}_"
if result_type == "jobs":
# get domain from company url
sourcestack_df["domain"] = sourcestack_df["company_url"].map(get_domain, "ignore")
sourcestack_columns_list.append("domain")
# create a data type dictionary for the fields
sourcestack_data_types = {
"string":["domain","tld","company_name","legal_name","title","h1","description","author","language","theme_name","theme_parent_id","ga_tag","city","region","country","postal_code",
"job_name","hours","department","seniority","education","company_legal_name","job_description","company_description","job_location","board_platform"],
"array":["tags_matched","tag_categories","categories","keywords","emails","phones","addresses","currencies","all_languages","all_countries","customer_urls","news_urls","comparison_urls","integration_urls","app_urls","open_job_names","open_job_departments","open_job_educations","open_job_seniorities","open_job_tags_matched","open_job_tag_categories"],
"int":["alexa_rank","umbrella_rank","majestic_rank","tranco_rank","domcop_rank","sku_vendor_count","sku_count","status_code","founded_year","open_job_count"],
"datetime":["founded_dt","last_indexed","first_indexed","last_modified","job_created_at","job_published_at","job_updated_at"]
}
sourcestack_column_types = {}
for column in sourcestack_columns_list:
if column in sourcestack_data_types["array"]: sourcestack_column_types[column] = "List"
elif column in sourcestack_data_types["int"]: sourcestack_column_types[column] = sqlalchemy.types.Integer
elif column in sourcestack_data_types["datetime"]: sourcestack_column_types[column] = sqlalchemy.types.DateTime
elif "url" in column: sourcestack_column_types[column] = sqlalchemy.types.String(50)
elif column in sourcestack_data_types["string"]: sourcestack_column_types[column] = sqlalchemy.types.String(50)
else:
sourcestack_column_types[column] = sqlalchemy.types.Text
print(f"Unrecognized data field while parsing sourcestack fields: {column}. Treating as text.")
# convert all array fields into comma separated strings
for column in sourcestack_columns_list:
if sourcestack_column_types[column] == "List":
sourcestack_df[column] = [",".join(map(str, l)) for l in sourcestack_df[column]]
sourcestack_column_types[column] = sqlalchemy.types.Text
connection = engine.connect()
# create the MySQL table
try:
sourcestack_df.to_sql(table_name + "sourcestack", connection, dtype = sourcestack_column_types)
except Exception as e:
print(f"Could not create table {table_name}sourcestack: {e}")
return None, False
# add a primary key to the table
connection.close()
try:
engine.execute(f"ALTER TABLE `{table_name}sourcestack` ADD PRIMARY KEY({primary_key})")
except Exception as e:
print(f"Could not add primary key to {table_name}sourcestack: {e}")
return None, False
print("Uploaded Sourcestack data to AWS.")
if settings["save_csv"]:
sourcestack_df.to_csv(os.path.join(sys.path[0], "csv/", table_name.replace("/", "-").replace(":", "-") + "sourcestack.csv"))
print("Saved Sourcestack csv.")
return sourcestack_df, True
def clearbit_data(sourcestack_df, table_name, settings, engine):
# get clearbit data
clearbit_headers = {"Authorization": f"Bearer {os.getenv('CLEARBIT_KEY')}"}
clearbit_fields = settings["clearbit_fields"]
if not "domain" in clearbit_fields: clearbit_fields.append("domain")
if not "name" in clearbit_fields: clearbit_fields.append("name")
# create new dataframe
clearbit_df = sourcestack_df[["domain", "company_name"]].copy().rename(columns={"company_name":"name"}).drop_duplicates(subset=["domain","name"])
clearbit_df = clearbit_df.where(clearbit_df != "", other=None)
duplicate_domain = clearbit_df.duplicated(subset=["domain"])
duplicate_domain[clearbit_df["domain"].isnull()] = False
clearbit_df = clearbit_df[~duplicate_domain].reset_index().dropna(subset=["domain", "name"],how="all")
# create lists to add to the dataframe
clearbit_data = {}
for field in clearbit_fields:
clearbit_data[field] = []
print(f"Starting to access clearbit data. This will take {round(len(clearbit_df.index) * 0.1, 1)}-{round(len(clearbit_df.index) * 0.2, 1)} seconds.")
for index, row in clearbit_df.iterrows():
try:
# get a value for domain
if row["domain"] is None:
r = requests.get(f"https://company.clearbit.com/v1/domains/find?name={row['name']}", headers=clearbit_headers)
if not r.ok:
print(f"Clearbit request failed: {r.json()}")
domain = r.json()["domain"]
# sleep to ensure rate limit (600 requests/min) is not hit
time.sleep(0.1)
else:
domain = row["domain"]
# get clearbit company data
r = requests.get(f"https://company.clearbit.com/v2/companies/find?domain={domain}", headers=clearbit_headers)
if not r.ok:
print(f"Clearbit request failed: {r.json()}")
clearbit_company = r.json()
# add to the data lists
for field in clearbit_fields:
if "." in field:
clearbit_data[field].append(clearbit_company[field.split(".")[0]][field.split(".")[1]])
else:
clearbit_data[field].append(clearbit_company[field])
# sleep to ensure rate limit (600 requests/min) is not hit
time.sleep(0.1)
except Exception as e:
print(f"Failed while trying access Clearbit")
print(e)
print("Failed company:")
print(row)
for field in clearbit_fields:
clearbit_data[field].append(None)
print()
# add the data lists to the dataframe
for field in clearbit_fields:
clearbit_df[field] = clearbit_data[field]
print("Got Clearbit data.")
# get datatypes
clearbit_data_types = {
"string":["id", "name", "legalName", "domain", "category.sector", "category.industryGroup", "category.industry", "category.subIndustry", "category.sicCode", "category.naicsCode", "description", "location", "timeZone", "geo.streetNumber", "geo.streetName", "geo.subPremise", "geo.City", "geo.state", "geo.stateCode", "geo.postalCode", "geo.country", "geo.countryCode", "geo.lat", "geo.lng", "identifiers.usEIN", "metrics.employeesRange", "metrics.estimatedAnnualRevenue", "facebook.handle", "linkedin.handle", "twitter.handle", "twitter.bio", "twitter.location", "twitter.site", "twitter.avatar", "crunchbase.handle", "logo", "emailProvider", "type", "phone", "parent.domain", "ultimateParent.domain", "indexedAt"],
"array":["domainAliases", "site.phoneNumbers", "site.emailAddresses", "tags", "tech", "techCategories"],
"int":["foundedYear", "utcOffset", "metrics.raised", "metrics.alexaUsRank", "alexaGlobalRank", "metrics.employees", "metrics.marketCap", "metrics.annualRevenue", "metrics.fiscalYearEnd", "twitter.id", "twitter.followers", "twitter.following"],
}
clearbit_column_types = {}
for column in clearbit_fields:
if column in clearbit_data_types["array"]: clearbit_column_types[column] = "List"
elif column in clearbit_data_types["int"]: clearbit_column_types[column] = sqlalchemy.types.Integer
elif column in clearbit_data_types["string"]: clearbit_column_types[column] = sqlalchemy.types.String(50)
else:
clearbit_column_types[column] = sqlalchemy.types.Text
print(f"Unrecognized data field while parsing clearbit fields: {column}. Treating as text.")
# convert all array fields into comma separated strings
for column in clearbit_fields:
if clearbit_column_types[column] == "List":
clearbit_df[column] = clearbit_df[column].map(lambda val: ",".join(val), "ignore")
# if not clearbit_df[column][0] is None:
# [",".join(map(str, l)) for l in clearbit_df[column]]
# else:
clearbit_column_types[column] = sqlalchemy.types.Text
# ensure no duplicate domains or null rows
clearbit_df = clearbit_df.drop_duplicates("domain").dropna(subset=clearbit_df.columns.difference(["index"]),how="all")
# create the MySQL table
connection = engine.connect()
try:
clearbit_df.to_sql(table_name + "clearbit", connection, dtype=clearbit_column_types)
except Exception as e:
print(f"Could not create table {table_name}clearbit: {e}")
return False
connection.close()
# add a primary key to the table
try:
engine.execute(f"ALTER TABLE `{table_name}clearbit` ADD PRIMARY KEY(domain)")
except Exception as e:
print(f"Could not add primary key to table {table_name}clearbit: {e}")
return False
print("Uploaded Clearbit data to AWS.")
if settings["save_csv"]:
clearbit_df.to_csv(os.path.join(sys.path[0], "csv/", table_name.replace("/", "-").replace(":", "-") + "clearbit.csv"))
print("Saved Sourcestack csv.")
return True
def main():
dotenv.load_dotenv()
# get settings (search result type and search params)
try:
with open(os.path.join(sys.path[0], "settings.json")) as settings_json:
settings = json.load(settings_json)
except Exception as e:
print(f"Could not open settings.json: {e}")
return False
result_type = settings["result_type"]
# sourcestack_params dict must contain:
# one of [name, url, category, uses_product, uses_category, filters] (str for most or list of dicts for filters)
# for filters, the format is [{"field":"[field name]","operator":"[operator]", "value":"[value"], {"etc.":"etc"}, {}
# dict may contain:
# exact (bool)
# fields (list[str])
# order_by (str)
# count_only (bool)
# limit (int)
# ensure the primay key is present in the sourcestack query
if result_type == "companies":
primary_key = "domain"
elif result_type == "jobs":
primary_key = "post_url"
# ensure information for clearbit is in there
if not "company_url" in settings["sourcestack_params"]["fields"]: settings["sourcestack_params"]["fields"].append("company_url")
if not "company_name" in settings["sourcestack_params"]["fields"]: settings["sourcestack_params"]["fields"].append("company_name")
if not primary_key in settings["sourcestack_params"]["fields"]: settings["sourcestack_params"]["fields"].append(primary_key)
table_name = f"{datetime.datetime.now(datetime.timezone.utc).strftime('%Y/%m/%d_%H:%M:%S')}_{result_type}_"
# format the URL for the RDS connection
url = f"mysql+mysqlconnector://{os.getenv('RDS_USER')}:{os.getenv('RDS_PASSWORD')}@{os.getenv('RDS_ENDPOINT')}:{os.getenv('RDS_PORT')}/{os.getenv('RDS_DB_NAME')}"
# create the sqlalchemy engine and RDS connection
try:
engine = sqlalchemy.create_engine(url)
connection = engine.connect()
connection.close()
except Exception as e:
print(f"Could not connect to RDS: {e}")
return False
sourcestack_df, succeeded = sourcestack_data(table_name, result_type, primary_key, settings, engine)
if sourcestack_df is None:
return succeeded
succeeded = clearbit_data(sourcestack_df, table_name, settings, engine)
if not succeeded: return False
# close RDS engine
engine.dispose()
print(f"Created tables {table_name}sourcestack and {table_name}clearbit successfully!")
return True
# TODO verify sourcestack schema
# todo market opslevel
def retry_sourcestack():
dotenv.load_dotenv()
clearbit_headers = {"Authorization": f"Bearer {os.getenv('CLEARBIT_KEY')}"}
try:
with open(os.path.join(sys.path[0], "settings.json")) as settings_json:
settings = json.load(settings_json)
except Exception as e:
print(f"Could not open settings.json: {e}")
return False
result_type = settings["result_type"]
# format the URL for the RDS connection
url = f"mysql+mysqlconnector://{os.getenv('RDS_USER')}:{os.getenv('RDS_PASSWORD')}@{os.getenv('RDS_ENDPOINT')}:{os.getenv('RDS_PORT')}/{os.getenv('RDS_DB_NAME')}"
# create the sqlalchemy engine and RDS connection
try:
engine = sqlalchemy.create_engine(url)
connection = engine.connect()
connection.close()
except Exception as e:
print(f"Could not connect to RDS: {e}")
return False
all_tables = engine.execute("SHOW TABLES").fetchall()
all_tables.sort(reverse=True)
last_table = all_tables[0][0]
if "clearbit" in last_table:
print("The most recent table is a clearbit table. The retry function cannot run.")
return False
table_name = last_table[0:last_table.find("sourcestack")]
connection = engine.connect()
sourcestack_df = pandas.read_sql(f"{table_name}sourcestack", connection)
connection.close()
succeeded = clearbit_data(sourcestack_df, table_name, settings, engine)
if not succeeded: return False
# close RDS engine
engine.dispose()
print(f"Created table {table_name}clearbit successfully!")
return True