-
Notifications
You must be signed in to change notification settings - Fork 1
/
generate_data.py
126 lines (106 loc) · 4.58 KB
/
generate_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
""" Functions to generate data.
AUTHOR: Britta U. Westner <britta.wstnr[at]gmail.com>
Credit: parts of the functions are inspired by:
https://github.com/kingjr/jr-tools
"""
import numpy as np
# mne
import mne
from mne import read_forward_solution
from mne.cov import compute_covariance
from mne.datasets import sample
import nibabel as nib
import random
# my own scripts
from process_raw_data import compute_covariance # noqa
from matrix_transforms import stc_2_mgzvol # noqa
from plotting import plot_source_act # noqa
from simulations import generate_signal, simulate_evoked_osc # noqa
from source_space_decoding import get_pattern # noqa
from spatial_filtering import run_lcmv_epochs # noqa
from transforms import lcmvEpochs # noqa
def generate_data(label_names, n_trials, freqs, snr, pred_filter=True,
phase_lock=False, filt_def=None, loc='center'):
"""Simulate the data for the source decoding project.
Possibility to add noise on sensor level.
Simulations are based on the SAMPLE data set.
Be aware of several hard coded options since geared towards specific
project."""
# load specific stuff from the SAMPLE data set
data_path = sample.data_path() + '/MEG/sample/'
fwd_sim = read_forward_solution(data_path +
'sample_audvis-meg-eeg-oct-6-fwd.fif',
verbose=False)
# MRI
t1_fname = data_path + '../../subjects/sample/mri/T1.mgz'
mri_mgz = nib.load(t1_fname)
info = mne.io.read_info(data_path + 'sample_audvis-no-filter-ave.fif',
verbose=False)
info['sfreq'] = 250.
# labels
label_l = mne.read_label(data_path + 'labels/' + label_names[0])
label_r = mne.read_label(data_path + 'labels/' + label_names[1])
labels = [label_l, label_r]
if loc == 'center':
label_l.values[:] = 1.
label_r.values[:] = 1.
center_l = label_l.center_of_mass(subject='sample')
center_r = label_r.center_of_mass(subject='sample')
locations = (center_l * 3, center_r * 3) # 3 orientations
elif loc == 'random':
locations = (random.choice(range(len(label_l) * 3)),
random.choice(range(len(label_r) * 3)))
else:
raise ValueError('loc has to be "random" or "center", got %s.' % loc)
# simulate the data
X = []
sim_coords = []
evokeds = []
mu = None
for label, loc, freq in zip(labels, locations, freqs):
if pred_filter is True:
if filt_def is None:
raise ValueError("Specify filtering boundaries.")
else:
filtering = {"hp": filt_def[0], "lp": filt_def[1],
"fir_design": "firwin"}
else:
filtering = None
evoked, stc, x_loc, mu = simulate_evoked_osc(info, fwd_sim,
n_trials=n_trials,
freq=freq, label=[label],
loc_in_label=loc,
picks='mag',
snr=snr, mu=mu,
loc_seed=loc,
return_matrix=True,
filtering=filtering,
phase_lock=phase_lock,
noise_type='white')
# data
X.append(x_loc)
# coordinates
vert_tmp = [stc.vertices[x] for x in [0, 1]
if stc.vertices[x].size != 0]
src_coords_tmp = [fwd_sim['src'][x]['rr'] for x in [0, 1]
if stc.vertices[x].size != 0]
coord_sim = src_coords_tmp[0][vert_tmp[0]]
coord_mgz = stc_2_mgzvol(coord_sim[0], fwd_sim, mri_mgz)
sim_coords.append(coord_mgz)
evokeds.append(evoked)
# get X and y
X = np.vstack(X)
y = np.r_[np.zeros(n_trials), np.ones(n_trials)]
return X, y, sim_coords, evokeds
def get_power_or_erp(X, data_obj, phase_lock, power_win, time_point=0.):
if phase_lock is True:
time_idx = data_obj[0].time_as_index(time_point)
# we choose one time_point
time_idx = data_obj[0].time_as_index(time_point)
X_out = X[:, :, time_idx[0]]
return X_out, time_idx
else:
# power
power_idx = data_obj[0].time_as_index(power_win)
X_out = np.mean(X[:, :, power_idx[0]:power_idx[1]]**2, axis=2)
return X_out