-
Notifications
You must be signed in to change notification settings - Fork 1
/
process_raw_data.py
259 lines (212 loc) · 8.14 KB
/
process_raw_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""Functions related to processing of sensor space data.
AUTHOR: Britta U. Westner
"""
import mne
import numpy as np
from mne import find_events, Epochs
from mne.io import Raw
from helper_functions import check_rank_cov_matrix
def read_run(raw_fname, run_num):
"""Read data from run run_num from disk.
Reads both maxfiltered and raw data.
Parameters:
-----------
raw_fname : string
path to the raw file with place holder for the run number
run_num : int
run number to be filled into the path
Returns
-------
raw : Raw object
"""
if '-sss.fif' in raw_fname:
# maxfiltered data:
run_fname = raw_fname.format(str(run_num))
raw = Raw(run_fname, preload=True)
else:
# no maxfilter:
run_fname = raw_fname.format(str(run_num))
raw = Raw(run_fname, preload=True, allow_maxshield=True)
return raw
def filter_epoching(raw, freqs, filt_transwidth, baseline, t_win,
resamp_rate=None, mag=False):
"""Filter and epoch data for ERP analysis.
Filter, epoch and eventually downsample data.
NOTE: This function has values specific to the experiment hard-coded,
adjust before using for something else!!!
Parameters:
-----------
raw : Raw
raw data of one run.
freqs : tuple or list (len=2)
lower and upper limit for filtering (successive HP and LP filter)
filt_transwidth : tuple or list (len=2)
transition bandwidth for FIR filters (HP and LP).
baseline : tuple (len=2)
baseline limits.
t_win : tuple or list (len=2)
time window for epoching.
resamp_rate : None | float
resampling rate, if not None, data will be resampled
mag : bool
whether only magnetometers should be considered, defaults to False.
Returns
-------
events :
events used for epoching.
epochs_filtered : Epochs
filtered epochs
"""
# event business comes first - allows to get rid of stim channel later
# TODO: make this smarter and easier to control
events = find_events(raw, stim_channel='STI101', shortest_event=1)
sel = np.where(events[:, 2] <= 255)[0]
events = events[sel, :]
# Compensate for delay (as measured manually with photodiode)
events[:, 0] += int(.050 * raw.info['sfreq'])
# pick channels:
if mag is True:
raw.pick_types(meg='mag', eog=False, stim=False)
else:
raw.pick_types(meg=True, eog=False, stim=False)
# filtering
(str(kk) for kk in freqs) # handle the possibility of None type
print('Filtering from %s to %s Hz.' % (freqs[0], freqs[1]))
raw.filter(freqs[0], freqs[1], n_jobs=1,
l_trans_bandwidth=filt_transwidth[0],
h_trans_bandwidth=filt_transwidth[1],
fir_design='firwin')
# epoching
print('Epoching data.')
epochs_filtered = Epochs(raw, events, proj=False, tmin=t_win[0],
tmax=t_win[1], baseline=baseline, preload=True)
epochs_filtered._raw = None # memory leakage
if resamp_rate is not None:
print('Resampling data at %i Hz.' % resamp_rate)
epochs_filtered.resample(resamp_rate)
return events, epochs_filtered
def hilbert_epoching(raw, freqs, filt_transwidth, baseline, t_win, resamp_rate,
mag=False):
"""Get data ready for hilbert beamformer.
Filter, hilbertize, and epoch the data. Returns BP-filtered as well as
hilbertized data.
NOTE: This function has values specific to the experiment hard-coded,
adjust before using for something else!!!
Parameters:
-----------
raw : Raw
raw data of one run.
freqs : tuple or list (len=2)
lower and upper limit for BP-filtering.
filt_transwidth : float
transition bandwidth for FIR filter.
baseline : tuple (len=2)
baseline limits.
t_win : tuple or list (len=2)
time window for epoching.
resamp_rate : float
resampling rate, only hilbertized data will be resampled.
mag : bool
whether only magnetometers should be considered, defaults to False.
Returns
-------
events :
events used for epoching.
epochs_filtered : Epochs
bandpass-filtered epochs for covariance matrix estimation.
epochs_hilbert : Epochs
hilbertized epochs, resampled!.
"""
# event business comes first - allows to get rid of stim channel later
# TODO: make this smarter and easier to control
events = find_events(raw, stim_channel='STI101', shortest_event=1)
sel = np.where(events[:, 2] <= 255)[0]
events = events[sel, :]
# Compensate for delay (as measured manually with photodiode)
events[:, 0] += int(.050 * raw.info['sfreq'])
# pick channels:
if mag is True:
raw.pick_types(meg='mag', eog=False, stim=False)
else:
raw.pick_types(meg=True, eog=False, stim=False)
# filtering and hilbertizing
print('Bandpass-filtering and hilbertizing for %i to %i Hz.'
% (freqs[0], freqs[1]))
raw.filter(freqs[0], freqs[1], n_jobs=1, l_trans_bandwidth=filt_transwidth,
h_trans_bandwidth=filt_transwidth, fir_design='firwin')
raw_hilbert = raw.copy().apply_hilbert(n_jobs=1, envelope=False)
# epoching
print('Epoching data.')
epochs_filtered = Epochs(raw, events, proj=False, tmin=t_win[0],
tmax=t_win[1], baseline=baseline, preload=True)
epochs_filtered._raw = None # memory leakage
epochs_hilbert = Epochs(raw_hilbert, events, proj=False, tmin=t_win[0],
tmax=t_win[1], baseline=baseline,
preload=True).resample(resamp_rate)
epochs_hilbert._raw = None # memory leakage
return events, epochs_filtered, epochs_hilbert
def compute_covariance(epochs, t_win, noise=False, t_win_noise=None,
check=True, plot=False):
"""Compute data and noise covariance matrix.
Computes the data covariance matrix and if desirable also the noise
covariance matrix from the same data. Evaluates the goodness of the matrix
and plots it.
Parameters:
-----------
epochs : epochs
epochs to compute the covariance matrices on.
t_win : tuple or list
time window to compute data covariance matrix on.
noise : bool
whether noise covariance matrix should be computed.
t_win_noise : tuple or list
time window to compute noise covariance matrix on.
check : bool
whether the covariance matrices' goodness should be checked.
Returns
-------
data_cov : MNE CovMat
data covariance matrix
noise_cov : MNE CovMat
noise covariance matrix, returns None if not computed.
"""
data_cov = mne.compute_covariance(epochs, tmin=t_win[0], tmax=t_win[1])
if check is True:
print('Data covariance matrix:')
check_rank_cov_matrix(data_cov, epochs)
if noise is True:
noise_cov = mne.compute_covariance(epochs, tmin=t_win_noise[0],
tmax=t_win_noise[1])
if check is True:
print('Noise covariance matrix:')
check_rank_cov_matrix(noise_cov, epochs)
# return whatever needs to be returned
if noise is True:
return data_cov, noise_cov
else:
return data_cov
def compute_snr(evoked, t_baseline, t_signal, label):
"""Computes the signal-to-noise ratio on an evoked signal.
Computes the signal-to-noise ratio (SNR) on an evoked signal, using the
root-mean-square.
Parameters:
-----------
evoked : evoked
evoked data to compute SNR on
t_baseline : tuple or list
time window to use as noise.
t_signal : tuple or list
time window to use as signal.
label : list of integers
the channels to include in the computation.
Returns
-------
snr : float
signal-to-noise ratio
"""
bl = evoked.time_as_index(t_baseline)
sig = evoked.time_as_index(t_signal)
rms_bl = np.sqrt(np.mean(evoked.data[label, bl[0]:bl[1]] ** 2))
rms_sig = np.sqrt(np.mean(evoked.data[label, sig[0]:sig[1]] ** 2))
snr = ((rms_sig - rms_bl) / rms_bl)
return snr