-
Notifications
You must be signed in to change notification settings - Fork 2
/
sqlite_tools.py
executable file
·382 lines (339 loc) · 12 KB
/
sqlite_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Functions for working with SQLite databases
"""
import sqlite3
import hashlib
import csv
# ~~~~~ FUNCTIONS ~~~~~ #
def get_table_names(conn):
"""
Gets all the names of tables in the database
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
Returns
-------
list
a list of table names (str)
"""
with conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
names = cursor.fetchall()
return([item for sublist in names for item in sublist])
def create_table(conn, table_name, col_name, col_type = "TEXT", is_primary_key = False):
"""
Create a table if it doesnt exist with a starting column
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table to create
col_name: str
the name of the first column to create in the table
col_type: str
the SQLite data type for the column
is_primary_key: bool
whether or not the column is the primary key for the table
"""
with conn:
cursor = conn.cursor()
sql_cmd = 'CREATE TABLE IF NOT EXISTS {0}'.format(table_name)
if is_primary_key:
table_cmd = ' ({0} {1} PRIMARY KEY)'.format(col_name, col_type)
else:
table_cmd = ' ({0} {1})'.format(col_name, col_type)
sql_cmd = sql_cmd + table_cmd
cursor.execute(sql_cmd)
def get_colnames(conn, table_name):
"""
Gets the column names from a table
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table to get column names from
Returns
-------
list
a list of column names (str)
"""
colnames = []
sql_cmd = 'select * from {0}'.format(table_name)
with conn:
cursor = conn.execute(sql_cmd)
for item in cursor.description:
colnames.append(item[0])
return(colnames)
def add_column(conn, table_name, col_name, col_type = "TEXT", default_val = None):
"""
Adds a column to a table
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table in which to create the column
col_name: str
the name of the column to create
col_type: str
the SQLite data type for the column
default_val: str
a default value to use for the column
"""
sql_cmd = "ALTER TABLE {0} ADD COLUMN '{1}' {2}".format(table_name, col_name, col_type)
if default_val:
default_val_cmd = " DEFAULT '{0}'".format(default_val)
sql_cmd = sql_cmd + default_val_cmd
try:
with conn:
cursor = conn.cursor()
cursor.execute(sql_cmd)
except:
# the column already exists...
pass
def sqlite_insert(conn, table_name, row, ignore = False, update = False, add_missing_cols = False):
"""
Inserts a row into a table
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table in which to insert the row
row: dict
a dictionary of key: value pairs corresponding to the column names and values of the items in the row to be added
ignore: bool
whether the entry should be ignored if it already exists in the table
add_missing_cols: bool
whether missing columns should be added to the table. Note: default column type will be used.
Examples
--------
Example usage::
row = {'key': key, 'value': val}
sqlite_insert(conn = conn, table_name = vals_table_name, row = row, ignore = True)
"""
if add_missing_cols:
colnames = get_colnames(conn = conn, table_name = table_name)
for key in row.keys():
if key not in colnames:
add_column(conn = conn, table_name = table_name, col_name = key)
cols = ', '.join('"{0}"'.format(col) for col in row.keys())
vals = ', '.join(':{0}'.format(col) for col in row.keys())
sql = 'INSERT '
if ignore:
sql = sql + 'OR IGNORE '
sql = sql + 'INTO "{0}" ({1}) VALUES ({2})'.format(table_name, cols, vals)
# print(sql)
with conn:
conn.cursor().execute(sql, row)
# if update:
# for key, value in row.items():
# sql = 'UPDATE "{0}" SET {1} = {2} WHERE '
# UPDATE my_table SET age = 34 WHERE name='Karen'
# self.cursor.execute("SELECT weight FROM Equipment WHERE name = ?", [item])
def md5_str(item):
"""
Gets the md5sum on the string representation of an object
Parameters
----------
item:
Python object to get the md5 sum from; should be coercible to 'str'
Returns
-------
str
the md5 hash for the item
"""
try:
# python 2.x
md5 = hashlib.md5(str(item)).hexdigest()
except:
# python 3.x
md5 = hashlib.md5(str(item).encode('utf-8')).hexdigest()
return(md5)
def row_exists(conn, table_name, col_name, value):
"""
Checks to see if a row exists in a table
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table in which to search for the row
col_name: str
the name of the column to search for the value in
value: str
the value to use as a key to check if a row exists
"""
sql_cmd = 'SELECT count(*) FROM {0} WHERE {1} = "{2}"'.format(table_name, col_name, value)
with conn:
cursor = conn.execute(sql_cmd)
data = cursor.fetchone()[0]
if data == 0:
return(False)
else:
return(True)
def get_vals(conn, table_name, select_col, match_col, value):
"""
Query a value from the database
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table in which to search for the row
select_col: str
the name of the column with the values to return
match_col: str
the name of the column to match a key to find values
value: str
the key to use for matching
Examples
--------
Example usage::
get_val(conn = conn, table_name = "runs", select_col = "samplesheet", match_col = "run", value = "180213_NB501073_0034_AHWJLLAFXX")
# SELECT samplesheet FROM runs WHERE run = "180213_NB501073_0034_AHWJLLAFXX";
"""
sql_cmd = "SELECT {0} FROM {1} WHERE {2} = '{3}'".format(select_col, table_name, match_col, value)
results = []
with conn:
cursor = conn.execute(sql_cmd)
data = cursor.fetchall()
for item in data:
results.append(item[0])
return(results)
def dump_sqlite(conn, output_file):
"""
Dumps the contents of a database to a SQLite formatted output file
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
output_file: str
file to write SQLite output to
"""
with open(output_file, "w") as f:
for line in conn.iterdump():
f.write("{0}\n".format(line))
def load_sqlite(conn, sqlite_file):
"""
Loads a database from a SQLite formatted file
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
"""
with open(sqlite_file) as f:
sql_cmd = f.read()
conn.executescript(sql_cmd)
def dump_csv(conn, table_name, output_file, delimiter = ',', quoting = csv.QUOTE_MINIMAL):
"""
Dumps a table from a SQLite database to a file
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the table to be dumped
output_file: str
the name of the file to write the table contents to
delimiter: str
field delimter for output file
quoting: csv.CONSTANT
``csv`` module constant to be used for quoting of the output file (https://docs.python.org/2/library/csv.html#csv.QUOTE_ALL)
"""
colnames = get_colnames(conn = conn, table_name = table_name)
cursor = conn.cursor()
with open(output_file, "w") as f:
writer = csv.DictWriter(f, delimiter = delimiter, fieldnames = colnames, quoting = quoting)
writer.writeheader()
for item in cursor.execute("SELECT * FROM {0}".format(table_name)):
writer.writerow({key:value for key, value in zip(colnames, item)})
def import_csv(conn, table_name, input_file, delimiter = ',', add_hash = False):
"""
Imports a .csv file into a pre-existing SQLite database
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the SQLite table to import data to
input_file: str
the name of the file to import data from
delimiter: str
field delimiter for the input file
add_hash: bool
adds a field labeled 'hash' with the md5sum of each entry
"""
with open(input_file) as f:
reader = csv.DictReader(f, delimiter = delimiter)
for row in reader:
# re-build dict with clean colname keys
row = sanitize_dict_keys(d = row)
# add entry hash
if add_hash:
row['hash'] = md5_str(''.join(row.values()))
# add missing columns to db table
for key in row.keys():
add_column(conn = conn, table_name = table_name, col_name = key, col_type = "TEXT")
# add the entry to the db
sqlite_insert(conn = conn, table_name = table_name, row = row)
def csv2sqlite(conn, input_file, table_name, delimiter = ','):
"""
Imports a .csv file to a new database
Parameters
----------
conn: sqlite3.Connection object
connection object to the database
table_name: str
the name of the SQLite table to import data to
input_file: str
the name of the file to import data from
delimiter: str
field delimiter for the input file
Notes
-----
Only use this with a new empty SQLite database connection (no tables, no columns). Imports all fields as TEXT. Does not add primary keys. Sanitizes column names for use with SQLite.
Examples
--------
Example usage::
import sqlite3
from util import sqlite_tools as sqt
input_file = "data.tsv"
output_file = "data.sqlite"
conn = sqlite3.connect(output_file)
sqt.csv2sqlite(conn = conn, input_file = input_file, output_file = output_file, table_name = "mydata", delimiter = '\t')
"""
with open(input_file) as f:
reader = csv.DictReader(f, delimiter = delimiter)
# create table in conn if not present
if table_name not in get_table_names(conn):
create_table(conn = conn, table_name = table_name, col_name = reader.fieldnames[0], col_type = "TEXT", is_primary_key = False)
for row in reader:
# re-build dict with clean colname keys
row = sanitize_dict_keys(d = row)
# add missing columns to db table
for key in row.keys():
add_column(conn = conn, table_name = table_name, col_name = key, col_type = "TEXT")
# add the entry to the db
sqlite_insert(conn = conn, table_name = table_name, row = row)
def sanitize_str(string):
"""
Cleans a character string for use in the database as a header
"""
string = string.strip().replace(' ', '_')
string = string.replace('.', '_')
string = string.replace(':', '_')
string = string.replace('#', '')
return(string)
def sanitize_dict_keys(d):
"""
Cleans a dictionary's keys for use in the database as a header by creating a new dict with 'cleaned' keys
"""
new_dict = { sanitize_str(key): value for key, value in d.items() }
return(new_dict)