Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bss and filter functions #23

Open
wants to merge 14 commits into
base: staging
Choose a base branch
from
Empty file added git
Empty file.
2 changes: 1 addition & 1 deletion matlab/tools/bss/calculate_time_lags.m
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@
T1(t) = t + prd + I - wlen - 1;
end

T0 = 1:NN;
T0 = 1:NN;
3 changes: 3 additions & 0 deletions python/src/oset/ecg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# __init__.py
from .synchronous_phase_samples import synchronous_phase_samples
from .phase_calculator import phase_calculator
12 changes: 12 additions & 0 deletions python/src/oset/ecg/bss/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .calculate_time_lags import calculate_time_lags
from .easi_source_separation import easi_source_separation
from .mica_projectors import mica_projectors
from .nonstationary_component_analysis import nonstationary_component_analysis
from .pica_matrices import pica_matrices
from .spectral_component_analysis_dft import spectral_component_analysis_dft






67 changes: 67 additions & 0 deletions python/src/oset/ecg/bss/calculate_time_lags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import argparse
import numpy as np

def calculate_time_lags(peaks, phase):
"""
Calculate time lags for the pseudo-periodic component analysis algorithm (PiCA)
using the R-peaks and the ECG phase signal.

Parameters:
peaks (array-like): Vector containing the indices of detected peaks in the input signal.
phase (array-like): Vector representing the phase values associated with the peaks (see phase_calculator).

Returns:
T0 (numpy.ndarray): Vector containing the time lags for the input peaks.
T1 (numpy.ndarray): Vector containing the corresponding lagged time points.

Reference:
R. Sameni, C. Jutten, and M. B. Shamsollahi. Multichannel
electrocardiogram decomposition using periodic component analysis. IEEE
Transactions on Biomedical Engineering, 55(8):1935-1940, Aug. 2008.

Revision History:
June 2024: Translated to Python from Matlab (calculate_time_lags.m)

Muhammad Ubadah Tanveer, 2024
The Open-Source Electrophysiological Toolbox
https://github.com/alphanumericslab/OSET
"""

J = np.where(peaks)[0]
n1 = np.diff(J)
prd = round(np.mean(n1))
wlen = max(n1) - min(n1)

T1 = np.zeros(len(peaks) - prd - wlen)
NN = len(T1)

for t in range(1, NN + 1):
df = np.abs(phase[t-1] - phase[t + prd - wlen - 1 : t + prd + wlen])
I = np.argmin(df)
T1[t-1] = t + prd + I - wlen

T0 = np.arange(1, NN + 1)

return T0, T1

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""
Calculate time lags for the pseudo-periodic component analysis algorithm (PiCA)
using the R-peaks and the ECG phase signal.

Parameters:
peaks (array-like): Vector containing the indices of detected peaks in the input signal.
phase (array-like): Vector representing the phase values associated with the peaks (see phase_calculator).

Returns:
T0 (numpy.ndarray): Vector containing the time lags for the input peaks.
T1 (numpy.ndarray): Vector containing the corresponding lagged time points.

Revision History:
June 2024: Translated to Python from Matlab (calculate_time_lags.m)

""",
formatter_class=argparse.RawTextHelpFormatter,
)
args = parser.parse_args()
116 changes: 116 additions & 0 deletions python/src/oset/ecg/bss/easi_source_separation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import argparse
import numpy as np

def easi_source_separation(x, nsou, lambda_, nlintype):
"""
easi_source_separation - An adaptive blind source separation algorithm using the EASI (equivariant adaptive separation by independence) algorithm

Parameters:
x : ndarray
Input signal as a matrix of size (ncap x T), where ncap is the number of input channels and T is the length of the signal
nsou : int
Number of sources
lambda_ : float
Adaptation parameter
nlintype : int
Nonlinearity type for the adaptation (y2 denotes y**2)
1: g = np.diag(y2) * y[:, t]
2: g = y[:, t] * np.diag(0.1 * y2) * np.diag(y2)
3: g = y[:, t] * np.sqrt(np.diag(y2))
4: g = y[:, t] * np.log(np.diag(y2))
5: g = -y[:, t] * (np.diag(y2) < 0.9)
6: g = y[:, t] / np.log(np.diag(y2))
7: g = -y[:, t] / np.sqrt(np.diag(y2))
8: g = -y[:, t] / np.diag(y2)

Returns:
y : ndarray
Separated sources as a matrix of size (nsou x T)
B : ndarray
Separation matrix of size (nsou x ncap)
conv : ndarray
Convergence of the algorithm at each iteration (1 x T)
References:
Beate Laheld and Jean-François Cardoso, "Adaptive source separation without prewhitening," Proc. EUSIPCO'94, 183-186, Edinburgh, Sep. 1994.

Revision History:
June 2024: Translated to Python from Matlab (easi_source_separation.m)

Muhammad Ubadah Tanveer, 2024
The Open-Source Electrophysiological Toolbox
https://github.com/alphanumericslab/OSET
"""
ncap, T = x.shape # Number of input channels and length of the signal
idsou = np.eye(nsou) # Identity matrix for nsou sources

B = np.random.randn(nsou, ncap) # Initialization of the separation matrix
y = np.zeros((nsou, T)) # Initialization of the separated sources
conv = np.zeros(T) # Initialization of the convergence vector

for t in range(T):
y[:, t] = B @ x[:, t] # Separation of sources using the separation matrix
y2 = y[:, t] @ y[:, t] # Square of the separated sources

# Nonlinearity adaptation based on the specified nlintype
if nlintype == 1:
g = np.diag(y2) @ y[:, t]
elif nlintype == 2:
g = y[:, t] @ np.diag(0.1 * y2) @ np.diag(y2)
elif nlintype == 3:
g = y[:, t] @ np.sqrt(np.diag(y2))
elif nlintype == 4:
g = y[:, t] @ np.log(np.diag(y2))
elif nlintype == 5:
g = -y[:, t] @ (np.diag(y2) < 0.9)
elif nlintype == 6:
g = y[:, t] / np.log(np.diag(y2))
elif nlintype == 7:
g = -y[:, t] / np.sqrt(np.diag(y2))
elif nlintype == 8:
g = -y[:, t] / np.diag(y2)

gy = g @ y[:, t] # Inner product of the nonlinearity adaptation and the separated source
G = (y2 - idsou) / (1 + lambda_ * np.trace(y2)) + (gy - gy.T) / (1 + lambda_ * np.abs(g.T @ y[:, t])) # Update matrix G
B = B - lambda_ * G @ B # Update the separation matrix using matrix G
conv[t] = np.linalg.norm(G) # Store the convergence value at each iteration

return y, B, conv

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""
easi_source_separation - An adaptive blind source separation algorithm using the EASI (equivariant adaptive separation by independence) algorithm

Parameters:
x : ndarray
Input signal as a matrix of size (ncap x T), where ncap is the number of input channels and T is the length of the signal
nsou : int
Number of sources
lambda_ : float
Adaptation parameter
nlintype : int
Nonlinearity type for the adaptation (y2 denotes y**2)
1: g = np.diag(y2) * y[:, t]
2: g = y[:, t] * np.diag(0.1 * y2) * np.diag(y2)
3: g = y[:, t] * np.sqrt(np.diag(y2))
4: g = y[:, t] * np.log(np.diag(y2))
5: g = -y[:, t] * (np.diag(y2) < 0.9)
6: g = y[:, t] / np.log(np.diag(y2))
7: g = -y[:, t] / np.sqrt(np.diag(y2))
8: g = -y[:, t] / np.diag(y2)

Returns:
y : ndarray
Separated sources as a matrix of size (nsou x T)
B : ndarray
Separation matrix of size (nsou x ncap)
conv : ndarray
Convergence of the algorithm at each iteration (1 x T)

Revision History:
June 2024: Translated to Python from Matlab (easi_source_separation.m)
""",
formatter_class=argparse.RawTextHelpFormatter,
)
args = parser.parse_args()

83 changes: 83 additions & 0 deletions python/src/oset/ecg/bss/mica_projectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import argparse
import numpy as np

def mica_projectors(A):
"""
mica_projectors - Multidimensional Independent Component Analysis (MICA) projection matrices.

Description:
Given an nxn matrix A, which has been estimated by, for example,
independent component analysis of mixtures following the model x = A*s,
this function provides the projectors that can be used to decompose x
into its linear decomposition as follows: x = \sigma_p x_p. Using the
projectors we can find: x_p = P_tilde[p] @ x. See the reference below
for details and notations.

Parameters:
A : ndarray
An nxn matrix representing the estimated mixing matrix in the ICA model.

Returns:
P_tilde : list of ndarray
A list of n projection matrices (each n x n) for each independent component.
P : list of ndarray
A list of n projection matrices (each n x n) used to create P_tilde.

Reference:
Cardoso, J-F. "Multidimensional independent component analysis." In
Proceedings of the 1998 IEEE International Conference on Acoustics,
Speech and Signal Processing, ICASSP'98 (Cat. No. 98CH36181), vol. 4,
pp. 1941-1944. IEEE, 1998. doi: 10.1109/ICASSP.1998.681443

Revision History:
June 2024: Translated to Python from Matlab (mica_projectors.m)

Muhammad Ubadah Tanveer, 2024
The Open-Source Electrophysiological Toolbox
https://github.com/alphanumericslab/OSET
"""

n = A.shape[1] # Get the number of columns in A (assumes square matrix).

P = [None] * n # Initialize a list for P.
S = np.zeros((n, n)) # Initialize S as an n x n matrix of zeros.

for k in range(n):
# Calculate the projection matrix P[k] for each independent component.
P[k] = np.outer(A[:, k], A[:, k]) / (A[:, k].T @ A[:, k])
S += P[k] # Accumulate P[k] into S.

P_tilde = [None] * n # Initialize a list for P_tilde.
S_inv = np.linalg.pinv(S) # Calculate the pseudo-inverse of S.

for k in range(n):
# Calculate P_tilde by multiplying each P[k] with S_inv.
P_tilde[k] = P[k] @ S_inv

return P_tilde, P

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""
mica_projectors - Multidimensional Independent Component Analysis (MICA) projection matrices.

Syntax: P_tilde, P = mica_projectors(A)

Parameters:
A : ndarray
An nxn matrix representing the estimated mixing matrix in the ICA model.

Returns:
P_tilde : list of ndarray
A list of n projection matrices (each n x n) for each independent component.
P : list of ndarray
A list of n projection matrices (each n x n) used to create P_tilde.

Revision History:
June 2024: Translated to Python from Matlab (mica_projectors.m)

""",
formatter_class=argparse.RawTextHelpFormatter,
)
args = parser.parse_args()

Loading