-
Notifications
You must be signed in to change notification settings - Fork 0
/
functions.py
168 lines (153 loc) · 7.19 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import urllib, json
import pandas as pd
import numpy as np
from scipy import stats
## COMMON FUNCTIONS
# PRINT all errors TO errorLog function placeholder used by web checker to print to multiple places
def errorLog(message):
print(message)
def dcAddErrorToList(error_column, row, error_to_add,df):
df.at[int(row), 'row'] = str(row)
if error_column in df.columns:
# check if cell value is empty (nan)
if(pd.isnull(df.iat[int(row), df.columns.get_loc(error_column)])):
# no data exists in cell so add error
df.at[int(row), error_column] = error_to_add
errorLog("Row: %s, Error To Add: %s" % (int(row),error_to_add))
else:
# a previous error was recorded so append to it
# even though there may be data lets check to make sure it is not empty
if str(df.at[int(row), error_column]):
#errorLog("There is already a previous error recorded: %s" % str(df.iloc[int(row), df.columns.get_loc(error_column)]))
df.at[int(row), error_column] = str(df.iloc[int(row), df.columns.get_loc(error_column)]) + "," + error_to_add
errorLog("Row: %s, Error To Add: %s" % (int(row),error_to_add))
else:
#errorLog("No error is recorded: %s" % str(df.iloc[int(row), df.columns.get_loc(error_column)]))
df.at[int(row), error_column] = error_to_add
errorLog("Row: %s, Error To Add: %s" % (int(row),error_to_add))
else:
df.at[int(row), error_column] = error_to_add
errorLog("Row: %s, Error To Add: %s" % (int(row),error_to_add))
return df
def checkSummary(statement,column,warn_or_error,error_label,human_error,df):
for item_number in statement:
unique_error = '{"column": "%s", "error_type": "%s", "error": "%s"}' % (column,warn_or_error,human_error)
dcAddErrorToList(error_label,item_number,unique_error,df)
## CHECKS ##
def checkData(statement,column,warn_or_error,error_label,human_error,dataframe):
for item_number in statement:
unique_error = '{"column": "%s", "error_type": "%s", "error": "%s"}' % (column,warn_or_error,human_error)
dcAddErrorToList(error_label,item_number,unique_error,dataframe)
def checkLogic(statement,column,warn_or_error,error_label,human_error,dataframe):
for item_number in statement:
unique_error = '{"column": "%s", "error_type": "%s", "error": "%s"}' % (column,warn_or_error,human_error)
dcAddErrorToList(error_label,item_number,unique_error,dataframe)
def dcValueAgainstMultipleValues (field,ucfield,listname,listfield,df):
# author - Jordan Golemo
# example: field = qacode, ucfield = QACode, listname = lu_toxtestacceptability, listfield = testacceptability, df = results
# get published lookup list values
url_search = "https://gis.sccwrp.org/arcgis/rest/services/bight2018%s/FeatureServer/0/query?where=1=1&returnGeometry=false&outFields=*&f=json" % listname
jsonurl = urllib.urlopen(url_search).read()
jsonlist = json.loads(jsonurl)
list_of_codes = []
# add lookup list values to array
for i in range(len(jsonlist['features'])):
list_of_codes.append(jsonlist['features'][i]['attributes'][listfield])
# submitted field to check against lookup list
df_search = df[field]
df_search = df_search.fillna('empty')
for i in range(len(df_search)):
# submitted individual field/cell values are separated by commas like: A,B,C
df_split = df_search[i].replace(',',' ').split()
for j in range(len(df_split)):
# find out if individual element is not in url lookup list
if df_split[j] not in list_of_codes:
invalid_code = df_search[i]
# required so it cant be empty
if df_split[j] == 'empty':
checkData(df[df[field].isnull()].tmp_row.tolist(),ucfield,'Toxicity Error','error','A code is required: <a href=http://checker.sccwrp.org/checker/scraper?action=help&layer=%s target=_blank>%s</a>' % (listname,listname),df)
else:
checkData(df.loc[df[field]==invalid_code].tmp_row.tolist(),ucfield,'Toxicity Error','error','You have submitted an invalid code: %s. Please see lookup list: <a href=http://checker.sccwrp.org/checker/scraper?action=help&layer=%s target=_blank>%s</a>' % (df_split[j],listname,listname),df)
# -- Stats functions -- #
def getCalculatedValues(grp):
grp['mean'] = grp['result'].mean()
grp['n'] = grp['fieldreplicate'].sum()
grp['stddev'] = grp['result'].std()
grp['variance'] = grp['stddev'].apply(lambda x: x ** 2 )
grp['coefficientvariance'] = ((grp['stddev']/grp['mean']) * 100)
return grp
def getPctControl(row, control_mean_dict):
## toxbatch control should always be 100
if(row['sampletypecode'] == 'CNEG'):
row['pctcontrol'] = 100
else:
if row['toxbatch'] in control_mean_dict:
# if the toxbatch is in the lookup dictionary then
# divide the result mean from the control mean and times by 100
# OLD LINE row['pctcontrol'] = ((row['mean']/control_mean_stats_dict[row['toxbatch']]['mean']) * 100)
row['pctcontrol'] = ((row['mean']/control_mean_dict[row['toxbatch']]) * 100)
else:
row['pctcontrol'] = np.NaN
return row
## author - Tyler Vu
def getPValue(summary):
for index, values in summary['toxbatch'].iteritems():
station_code = summary.iloc[index, summary.columns.get_loc('stationid')]
cneg_result = summary[['result']].where((summary['sampletypecode'] == 'CNEG') & (summary['toxbatch'] == values))
result_both = summary[['result']].where((summary['toxbatch'] == values) & (summary['stationid'] == station_code) )
cneg_result = cneg_result.dropna()
result_both = result_both.dropna()
t, p = stats.ttest_ind(cneg_result, result_both, equal_var = False)
errorLog(summary.iloc[index])
summary.at[index, 'tstat'] = t
single_tail = p/2
summary.at[index, 'pvalue'] = single_tail #we divide by 2 to make it a 1 tailed
if (t < 0):
summary.at[index, 'sigeffect'] = 'NSC'
else:
if (single_tail <= .05):
summary.at[index,'sigeffect'] = 'SC'
else:
summary.at[index,'sigeffect'] = 'NSC'
## author - Tyler Vu
def getSQO(grp):
#if(grp[grp.index.map(lambda x: x[0] in species)]):
#if(grp['species'].isin(['EE','Eohaustorius estuarius'])):
if(grp['species'] == 'Eohaustorius estuarius'):
if(grp['mean'] < 90):
if (grp['pctcontrol'] < 82):
if (grp['pctcontrol'] < 59):
grp['sqocategory'] = 'High Toxicity'
else:
if (grp['sigeffect'] == 'NSC'):
grp['sqocategory'] = 'Low Toxicity'
else:
grp['sqocategory'] = 'Moderate Toxicity'
else:
if (grp['sigeffect'] == 'NSC'):
grp['sqocategory'] = 'Nontoxic'
else:
grp['sqocategory'] = 'Low Toxicity'
else:
grp['sqocategory'] = 'Nontoxic'
#elif (grp['species'].isin(['MG','Mytilus galloprovincialis'])):
elif (grp['species'] == 'Mytilus galloprovincialis'):
if (grp['mean'] < 80):
if (grp['pctcontrol'] < 77):
if (grp['pctcontrol'] < 42):
grp['sqocategory'] = 'High Toxicity'
else:
if (grp['sigeffect'] == 'NSC'):
grp['sqocategory'] = 'Low Toxicity'
else:
grp['sqocategory'] = 'Moderate Toxicity'
else:
if (grp['sigeffect'] == 'NSC'):
grp['sqocategory'] = 'Nontoxic'
else:
grp['sqocategory'] = 'Low Toxicity'
else:
grp['sqocategory'] = 'Nontoxic'
else:
grp['sqocategory'] = None
return grp