Module signal_quality.denoising
Expand source code
import torch
from tqdm import tqdm
import pywt
import matplotlib.pyplot as plt
import emd
import numpy as np
import scipy
import sys
import emd
import neurokit2 as nk
import datasets
# print(torch.__version__)
### ===================================================== Neural Networks =====================================================
class Net(torch.nn.Module):
def __init__(self, h_sizes=[768, 256, 64, 2], activation=torch.nn.LeakyReLU()):
super(Net, self).__init__()
"""
Flexible feed forward network over a base encoder.
Parameters
----------
encoder_model: The base encoder model
h_sizes: Linear layer sizes to be used in the MLP
activation: Activation function to be use in the MLP.
device: Device to use for training. 'cpu' by default.
"""
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
self.layers = torch.nn.ModuleList()
for k in range(len(h_sizes)-1):
self.layers.append(torch.nn.Linear(h_sizes[k], h_sizes[k+1]))
self.layers.append(activation)
# self.layers.append(torch.nn.Dropout(p=0.5))
self.to(self.device)
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
def fit(self, x_train, y_train, batch_size=32, epochs=10, criterion=torch.nn.MSELoss(), lr=1e-3, weight_decay=1e-4, shuffle=True, seed=0):
assert type(x_train)==np.ndarray and type(y_train)==np.ndarray
np.random.seed(seed)
x_train = torch.from_numpy(x_train).float()
y_train = torch.from_numpy(y_train).float()
optimizer = torch.optim.Adam(self.parameters(), lr=lr, weight_decay=weight_decay)
print(self.device)
for epoch in tqdm(range(epochs)):
if shuffle:
inds = np.arange(len(x_train))
np.random.shuffle(inds)
x_train = x_train[inds]
y_train = y_train[inds]
for i in range(0, len(x_train), batch_size):
batch_x = x_train[i:i+batch_size].to(self.device)
batch_y = y_train[i:i+batch_size].to(self.device)
out = self.forward(batch_x)
loss = criterion(out, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 10 == 0:
tqdm.write("loss: {:.6f}".format(loss.detach().cpu().numpy()))
def predict(self, x, batch_size=200):
with torch.no_grad():
x = torch.from_numpy(x).float()
out = []
for i in range(0, len(x), batch_size):
batch_x = x[i:i+batch_size].to(self.device)
out.append(self.forward(batch_x).cpu().numpy())
return np.concatenate(out)
class LSTMNet(Net):
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim=1, activation=torch.nn.LeakyReLU()):
super(LSTMNet, self).__init__()
"""
Flexible feed forward network over a base encoder.
Parameters
----------
encoder_model: The base encoder model
h_sizes: Linear layer sizes to be used in the MLP
activation: Activation function to be use in the MLP.
device: Device to use for training. 'cpu' by default.
"""
self.layer_dim = layer_dim
self.hidden_dim = hidden_dim
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.activation = activation
self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
self.fc = torch.nn.Linear(hidden_dim, output_dim)
self.to(self.device)
def forward(self, x):
# Initialize hidden state with zeros
h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(self.device)
# Initialize cell state
c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(self.device)
# We need to detach as we are doing truncated backpropagation through time (BPTT)
# If we don't, we'll backprop all the way to the start even after going through another batch
out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
# out, (hn, cn) = self.lstm(x, (h0, c0))
# out shape is (batch_size, seq_len, hidden_dim)
# Index hidden state of last time step
# print(out.shape)
out = self.activation(out)
out = self.fc(out)
# out shape is (batch_size, seq_len, 1)
out = out.squeeze()
# out shape is (batch_size, seq_len)
return out
class Conv1DNet(Net):
def __init__(self, h_sizes=[6, 768, 256, 64, 1], activation=torch.nn.LeakyReLU()):
super(Conv1DNet, self).__init__()
self.layers = torch.nn.ModuleList()
for k in range(len(h_sizes)-1):
# self.layers.append(torch.nn.Linear(h_sizes[k], h_sizes[k+1]))
self.layers.append(torch.nn.Conv1d(in_channels=h_sizes[k], out_channels=h_sizes[k+1], kernel_size=5, padding='same'))
self.layers.append(activation)
if k != len(h_sizes)-1:
self.layers.append(torch.nn.BatchNorm1d(h_sizes[k+1]))
# self.layers.append(torch.nn.Dropout(p=0.5))
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.to(self.device)
def forward(self, x):
x = torch.permute(x, (0,2,1))
for layer in self.layers:
x = layer(x)
return x.squeeze()
class Conv1DAutoencoderNet(Net):
def __init__(self, h_size, input_len, activation=torch.nn.ELU(), kernel_size=8, stride=2):
super().__init__()
pad = (((stride-1)*input_len-stride+kernel_size) // 2)
self.encoder = torch.nn.ModuleList()
self.encoder.append(torch.nn.Conv1d(in_channels=h_size, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.encoder.append(torch.nn.BatchNorm1d(40))
self.encoder.append(torch.nn.Conv1d(in_channels=40, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.encoder.append(torch.nn.BatchNorm1d(20))
self.encoder.append(torch.nn.Conv1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.encoder.append(torch.nn.BatchNorm1d(20))
self.encoder.append(torch.nn.Conv1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.encoder.append(torch.nn.BatchNorm1d(20))
self.encoder.append(torch.nn.Conv1d(in_channels=20, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.encoder.append(torch.nn.BatchNorm1d(40))
# self.encoder.append(torch.nn.Conv1d(in_channels=40, out_channels=1, kernel_size=kernel_size, stride=stride, padding=pad))
# self.encoder.append(activation)
# self.encoder.append(torch.nn.BatchNorm1d(1))
self.decoder = torch.nn.ModuleList()
# self.decoder.append(torch.nn.ConvTranspose1d(in_channels=1, out_channels=2, kernel_size=kernel_size, stride=stride, padding=pad))
# self.encoder.append(activation)
# self.decoder.append(torch.nn.BatchNorm1d(1))
# self.decoder.append(torch.nn.ConvTranspose1d(in_channels=1, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad))
# self.encoder.append(activation)
# self.decoder.append(torch.nn.BatchNorm1d(40))
self.decoder.append(torch.nn.ConvTranspose1d(in_channels=40, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.decoder.append(torch.nn.BatchNorm1d(20))
self.decoder.append(torch.nn.ConvTranspose1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.decoder.append(torch.nn.BatchNorm1d(20))
self.decoder.append(torch.nn.ConvTranspose1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad))
self.encoder.append(activation)
self.decoder.append(torch.nn.BatchNorm1d(20))
self.decoder.append(torch.nn.ConvTranspose1d(in_channels=20, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad))
self.decoder.append(torch.nn.BatchNorm1d(40))
self.encoder.append(activation)
self.decoder.append(torch.nn.ConvTranspose1d(in_channels=40, out_channels=1, kernel_size=kernel_size, stride=stride, padding=pad))
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.to(self.device)
def forward(self, x):
x = torch.permute(x, (0,2,1))
for layer in self.encoder:
x = layer(x)
for layer in self.decoder:
x = layer(x)
return x.squeeze()
### ===================================================== Utility Functions =====================================================
def decompose_signal(signal, wavelet='db2'):
wavelet_levels = pywt.dwt_max_level(len(signal), wavelet)
wavelet_levels = max(wavelet_levels - 3, 1)
coeffs = pywt.wavedec(signal, wavelet=wavelet, level=wavelet_levels)
# Detail coefficients at each decomposition level
dcoeffs = coeffs[1:]
# Estimate the noise via the method in [2]_
detail_coeffs = dcoeffs[-1]
# 75th quantile of the underlying, symmetric noise distribution
denom = scipy.stats.norm.ppf(0.75)
sigma = np.median(np.abs(detail_coeffs)) / denom
# # The VisuShrink thresholds from donoho et al.
# threshold = sigma*np.sqrt(2*np.log(len(signal)))
# denoised_detail = [pywt.threshold(data=level, value=threshold, mode='soft') for level in dcoeffs]
# # print([c.shape[0] for c in denoised_detail], np.sum([c.shape[0] for c in denoised_detail]))
denoised_detail = [scipy.signal.resample(x=c, num=len(signal)) for c in coeffs]
# # print(np.stack(denoised_detail, axis=-1).shape)
denoised_detail = np.stack(denoised_detail, axis=-1)
return np.concatenate([denoised_detail, signal[:,np.newaxis]], axis=1)
def rescale_signal(x):
r = np.max(x) - np.min(x)
if r == 0:
return np.zeros_like(x)
else:
return 2*(x-np.min(x)) / r - 1
def load_data(mitdb_dict, nstdb_extra_dict, window=512, train_end_ind=100000):
train_x = []
train_y = []
train_noise_type = []
test_x = []
test_y = []
test_noise_type = []
# print(len(mitdb_dict[subject]['MLII']['data']))
for subject in ['118', '119']:
for noise_type in ['em', 'ma', 'bw', 'gn', 'all']:
# for noise_type in ['gn']:
# for dB in ['_6','00','06','12','18','24']:
for dB in ['_6','00','06','12','18','24']:
orig = mitdb_dict[subject]['MLII']['data']
orig = rescale_signal(orig)
noisy = nstdb_extra_dict[subject+noise_type+dB]['MLII']['data']
noisy = rescale_signal(noisy)
for i in range(0, train_end_ind, window):
train_x.append(noisy[i:i+window])
train_y.append(orig[i:i+window])
train_noise_type.append(noise_type+dB)
test_x.append(noisy[train_end_ind+i:train_end_ind+i+window])
test_y.append(orig[train_end_ind+i:train_end_ind+i+window])
test_noise_type.append(noise_type+dB)
train_x = np.array(train_x)
train_y = np.array(train_y)
train_noise_type = np.array(train_noise_type)
test_x = np.array(test_x)
test_y = np.array(test_y)
test_noise_type = np.array(test_noise_type)
return train_x, train_y, train_noise_type, test_x, test_y, test_noise_type
def denoise_wavelet(signal, wavelet='db4', wavelet_levels=None, mode='soft', thesh_method='donoho', visualize=False):
"""Based off of the official skimage implementation:
https://scikit-image.org/docs/stable/api/skimage.restoration.html#skimage.restoration.denoise_wavelet
> import skimage
> temp1 = skimage.restoration.denoise_wavelet(image=example_signal, sigma=None, wavelet='db4', mode='soft',
> wavelet_levels=None, convert2ycbcr=False, method='VisuShrink',
> rescale_sigma=True, channel_axis=None)
> temp2 = denoise_wavelet(example_signal)
> print(np.all(temp1==temp2)) # returns True
"""
if wavelet_levels is None:
wavelet_levels = pywt.dwt_max_level(len(signal), wavelet)
wavelet_levels = max(wavelet_levels - 3, 1)
coeffs = pywt.wavedec(signal, wavelet=wavelet, level=wavelet_levels)
# Detail coefficients at each decomposition level
dcoeffs = coeffs[1:]
# Estimate the noise via the method in [2]_
detail_coeffs = dcoeffs[-1]
if thesh_method == 'donoho':
# 75th quantile of the underlying, symmetric noise distribution
denom = scipy.stats.norm.ppf(0.75)
sigma = np.median(np.abs(detail_coeffs)) / denom
# The VisuShrink thresholds from donoho et al.
threshold = sigma*np.sqrt(2*np.log(len(signal)))
else:
raise NotImplementedError
if visualize:
plt.figure(figsize=(15,10))
for i, level in enumerate(dcoeffs):
plt.subplot(wavelet_levels, 1, i+1)
plt.plot(level)
# plt.plot(pywt.threshold(data=level, value=threshold, mode=mode))
plt.show()
denoised_detail = [pywt.threshold(data=level, value=threshold, mode=mode) for level in dcoeffs]
denoised_coeffs = [coeffs[0]] + denoised_detail
return pywt.waverec(denoised_coeffs, wavelet)
def denoise_emd(signal, sample_rate, visualize=False, P=5, beta=30):
"""Follows the approach as defined by Weng et al.
"""
# Predefine P as opposed to t-test
ecg_cleaned = nk.ecg_clean(signal, sampling_rate=sample_rate, method="neurokit")
signals, info = nk.ecg_peaks(ecg_cleaned, sampling_rate=sample_rate, method='kalidas2017')
rpeak_mask = signals.values.flatten()
rpeaks = info['ECG_R_Peaks']
# print(rpeaks)
# plt.show()
# print(len(rpeaks), len(waves_dwt['ECG_R_Offsets']), len(waves_dwt['ECG_R_Onsets']))
try:
signal_dwt, waves_dwt = nk.ecg_delineate(ecg_cleaned, rpeaks, sampling_rate=sample_rate, method="dwt", show=visualize, show_type='bounds_R')
flat_window = int(np.nanmean(np.array(waves_dwt['ECG_R_Offsets'])-np.array(waves_dwt['ECG_R_Onsets']))/2)+1
except Exception as e:
return signal
imf = emd.sift.sift(signal)
if visualize:
emd.plotting.plot_imfs(imf)
reconstruction = np.zeros_like(signal)
P = min(P, imf.shape[1]-1)
for i in range(P):
tukey_window = scipy.signal.windows.tukey(M=int(flat_window + flat_window * 2*beta * (i+1)*.01))
mask = np.convolve(rpeak_mask, tukey_window, mode='same')
reconstruction += mask * imf[:,i]
if P <= imf.shape[1]:
reconstruction += imf[:,P:].sum(axis=1)
return reconstruction
def denoise_nn(signal, model):
# assume that input signals are of shape (n,512)
if len(signal.shape) == 1 and len(signal) == 512:
signals = [decompose_signal(rescale_signal(signal))]
return model.predict(np.array(signals))
elif len(signal.shape) == 2 and signal.shape[1] == 512:
signals = [decompose_signal(rescale_signal(signal_)) for signal_ in signal]
return model.predict(np.array(signals))
else:
raise NotImplementedError
def get_power(signal):
return np.mean(np.power(signal, 2))
def calc_snr(signal, noise):
signal_avg_watts = get_power(signal)
noise_avg_watts = get_power(noise)
return 10*(np.log10(signal_avg_watts) - np.log10(noise_avg_watts))
def calc_snr_ecg(signal, noise, sampling_rate):
peaks = nk.ecg_peaks(signal, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
window = int(sampling_rate*.05)
peak_ranges = np.array([np.ptp(signal[max(0, peak-window):min(len(signal), peak+window)]) for peak in peaks])
peak_ranges = peak_ranges[(np.percentile(peak_ranges, 5)<peak_ranges) & (peak_ranges<np.percentile(peak_ranges, 95))]
S = np.mean(peaks)**2 / 8
noise_windows = [noise[i:i+sampling_rate] for i in range(0, len(noise), sampling_rate)]
noise_rmsds = np.array([np.mean(np.power(w-w.mean(),2)) for w in noise_windows])
noise_rmsds = noise_rmsds[(np.percentile(noise_rmsds, 5)<noise_rmsds) & (noise_rmsds<np.percentile(noise_rmsds, 95))]
N = np.mean(noise_rmsds)**2
return 10*(np.log10(S) - np.log10(N))
### ===================================================== Driver for training NNs =====================================================
# import argparse
if __name__=="__main__":
mitdb_dict = datasets.load_mitdb(data_path='/zfsauton/project/public/chufang/mitdb/', verbose=False)
nstdb_extra_dict = datasets.load_nstdb_extra(nstdb_path='/zfsauton/project/public/chufang/nstdb/', mitdb_path='/zfsauton/project/public/chufang/mitdb/', verbose=False, downsampled_sampling_rate=125)
train_x, train_y, train_noise_type, test_x, test_y, test_noise_type = load_data(mitdb_dict, nstdb_extra_dict, window=512)
train_x = np.array([decompose_signal(x_) for x_ in train_x])
test_x = np.array([decompose_signal(x_) for x_ in test_x])
print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)
train = False
## Training LSTM ---------------------------------------------------------------
torch.manual_seed(0)
np.random.seed(0)
lstm = LSTMNet(input_dim=6, hidden_dim=64, layer_dim=1)
if train:
lstm.fit(train_x, train_y, epochs=30, batch_size=16, criterion=torch.nn.MSELoss(reduction='mean'))
torch.save(lstm.cpu().state_dict(), 'denoising_models/lstm.pt')
else:
lstm.load_state_dict(torch.load('denoising_models/lstm.pt'))
predictions = lstm.predict(test_x)
np.save('denoising_models/lstm_preds.npy', predictions)
## Training 1dCNN ---------------------------------------------------------------
torch.manual_seed(0)
np.random.seed(0)
cnn = Conv1DNet(h_sizes=[6, 256, 128, 64, 64, 1], activation=torch.nn.LeakyReLU())
if train:
cnn.fit(train_x, train_y, epochs=30, batch_size=16, criterion=torch.nn.MSELoss(reduction='mean'))
torch.save(cnn.cpu().state_dict(), 'denoising_models/cnn.pt')
else:
cnn.load_state_dict(torch.load('denoising_models/cnn.pt'))
predictions = cnn.predict(test_x)
np.save('denoising_models/cnn_preds.npy', predictions)
## Training 1dCNNAutoencoder ---------------------------------------------------------------
torch.manual_seed(0)
np.random.seed(0)
cnn = Conv1DAutoencoderNet(h_size=6, input_len=train_x.shape[1], activation=torch.nn.LeakyReLU(), kernel_size=8, stride=2)
if train:
cnn.fit(train_x, train_y, epochs=30, batch_size=16, criterion=torch.nn.MSELoss(reduction='mean'))
torch.save(cnn.cpu().state_dict(), 'denoising_models/cnnae.pt')
else:
cnn.load_state_dict(torch.load('denoising_models/cnnae.pt'))
predictions = cnn.predict(test_x)
np.save('denoising_models/cnnae_preds.npy', predictions)
Functions
def calc_snr(signal, noise)-
Expand source code
def calc_snr(signal, noise): signal_avg_watts = get_power(signal) noise_avg_watts = get_power(noise) return 10*(np.log10(signal_avg_watts) - np.log10(noise_avg_watts)) def calc_snr_ecg(signal, noise, sampling_rate)-
Expand source code
def calc_snr_ecg(signal, noise, sampling_rate): peaks = nk.ecg_peaks(signal, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks'] window = int(sampling_rate*.05) peak_ranges = np.array([np.ptp(signal[max(0, peak-window):min(len(signal), peak+window)]) for peak in peaks]) peak_ranges = peak_ranges[(np.percentile(peak_ranges, 5)<peak_ranges) & (peak_ranges<np.percentile(peak_ranges, 95))] S = np.mean(peaks)**2 / 8 noise_windows = [noise[i:i+sampling_rate] for i in range(0, len(noise), sampling_rate)] noise_rmsds = np.array([np.mean(np.power(w-w.mean(),2)) for w in noise_windows]) noise_rmsds = noise_rmsds[(np.percentile(noise_rmsds, 5)<noise_rmsds) & (noise_rmsds<np.percentile(noise_rmsds, 95))] N = np.mean(noise_rmsds)**2 return 10*(np.log10(S) - np.log10(N)) def decompose_signal(signal, wavelet='db2')-
Expand source code
def decompose_signal(signal, wavelet='db2'): wavelet_levels = pywt.dwt_max_level(len(signal), wavelet) wavelet_levels = max(wavelet_levels - 3, 1) coeffs = pywt.wavedec(signal, wavelet=wavelet, level=wavelet_levels) # Detail coefficients at each decomposition level dcoeffs = coeffs[1:] # Estimate the noise via the method in [2]_ detail_coeffs = dcoeffs[-1] # 75th quantile of the underlying, symmetric noise distribution denom = scipy.stats.norm.ppf(0.75) sigma = np.median(np.abs(detail_coeffs)) / denom # # The VisuShrink thresholds from donoho et al. # threshold = sigma*np.sqrt(2*np.log(len(signal))) # denoised_detail = [pywt.threshold(data=level, value=threshold, mode='soft') for level in dcoeffs] # # print([c.shape[0] for c in denoised_detail], np.sum([c.shape[0] for c in denoised_detail])) denoised_detail = [scipy.signal.resample(x=c, num=len(signal)) for c in coeffs] # # print(np.stack(denoised_detail, axis=-1).shape) denoised_detail = np.stack(denoised_detail, axis=-1) return np.concatenate([denoised_detail, signal[:,np.newaxis]], axis=1) def denoise_emd(signal, sample_rate, visualize=False, P=5, beta=30)-
Follows the approach as defined by Weng et al.
Expand source code
def denoise_emd(signal, sample_rate, visualize=False, P=5, beta=30): """Follows the approach as defined by Weng et al. """ # Predefine P as opposed to t-test ecg_cleaned = nk.ecg_clean(signal, sampling_rate=sample_rate, method="neurokit") signals, info = nk.ecg_peaks(ecg_cleaned, sampling_rate=sample_rate, method='kalidas2017') rpeak_mask = signals.values.flatten() rpeaks = info['ECG_R_Peaks'] # print(rpeaks) # plt.show() # print(len(rpeaks), len(waves_dwt['ECG_R_Offsets']), len(waves_dwt['ECG_R_Onsets'])) try: signal_dwt, waves_dwt = nk.ecg_delineate(ecg_cleaned, rpeaks, sampling_rate=sample_rate, method="dwt", show=visualize, show_type='bounds_R') flat_window = int(np.nanmean(np.array(waves_dwt['ECG_R_Offsets'])-np.array(waves_dwt['ECG_R_Onsets']))/2)+1 except Exception as e: return signal imf = emd.sift.sift(signal) if visualize: emd.plotting.plot_imfs(imf) reconstruction = np.zeros_like(signal) P = min(P, imf.shape[1]-1) for i in range(P): tukey_window = scipy.signal.windows.tukey(M=int(flat_window + flat_window * 2*beta * (i+1)*.01)) mask = np.convolve(rpeak_mask, tukey_window, mode='same') reconstruction += mask * imf[:,i] if P <= imf.shape[1]: reconstruction += imf[:,P:].sum(axis=1) return reconstruction def denoise_nn(signal, model)-
Expand source code
def denoise_nn(signal, model): # assume that input signals are of shape (n,512) if len(signal.shape) == 1 and len(signal) == 512: signals = [decompose_signal(rescale_signal(signal))] return model.predict(np.array(signals)) elif len(signal.shape) == 2 and signal.shape[1] == 512: signals = [decompose_signal(rescale_signal(signal_)) for signal_ in signal] return model.predict(np.array(signals)) else: raise NotImplementedError def denoise_wavelet(signal, wavelet='db4', wavelet_levels=None, mode='soft', thesh_method='donoho', visualize=False)-
Based off of the official skimage implementation: https://scikit-image.org/docs/stable/api/skimage.restoration.html#skimage.restoration.denoise_wavelet
import skimage temp1 = skimage.restoration.denoise_wavelet(image=example_signal, sigma=None, wavelet='db4', mode='soft', wavelet_levels=None, convert2ycbcr=False, method='VisuShrink', rescale_sigma=True, channel_axis=None) temp2 = denoise_wavelet(example_signal) print(np.all(temp1==temp2)) # returns True
Expand source code
def denoise_wavelet(signal, wavelet='db4', wavelet_levels=None, mode='soft', thesh_method='donoho', visualize=False): """Based off of the official skimage implementation: https://scikit-image.org/docs/stable/api/skimage.restoration.html#skimage.restoration.denoise_wavelet > import skimage > temp1 = skimage.restoration.denoise_wavelet(image=example_signal, sigma=None, wavelet='db4', mode='soft', > wavelet_levels=None, convert2ycbcr=False, method='VisuShrink', > rescale_sigma=True, channel_axis=None) > temp2 = denoise_wavelet(example_signal) > print(np.all(temp1==temp2)) # returns True """ if wavelet_levels is None: wavelet_levels = pywt.dwt_max_level(len(signal), wavelet) wavelet_levels = max(wavelet_levels - 3, 1) coeffs = pywt.wavedec(signal, wavelet=wavelet, level=wavelet_levels) # Detail coefficients at each decomposition level dcoeffs = coeffs[1:] # Estimate the noise via the method in [2]_ detail_coeffs = dcoeffs[-1] if thesh_method == 'donoho': # 75th quantile of the underlying, symmetric noise distribution denom = scipy.stats.norm.ppf(0.75) sigma = np.median(np.abs(detail_coeffs)) / denom # The VisuShrink thresholds from donoho et al. threshold = sigma*np.sqrt(2*np.log(len(signal))) else: raise NotImplementedError if visualize: plt.figure(figsize=(15,10)) for i, level in enumerate(dcoeffs): plt.subplot(wavelet_levels, 1, i+1) plt.plot(level) # plt.plot(pywt.threshold(data=level, value=threshold, mode=mode)) plt.show() denoised_detail = [pywt.threshold(data=level, value=threshold, mode=mode) for level in dcoeffs] denoised_coeffs = [coeffs[0]] + denoised_detail return pywt.waverec(denoised_coeffs, wavelet) def get_power(signal)-
Expand source code
def get_power(signal): return np.mean(np.power(signal, 2)) def load_data(mitdb_dict, nstdb_extra_dict, window=512, train_end_ind=100000)-
Expand source code
def load_data(mitdb_dict, nstdb_extra_dict, window=512, train_end_ind=100000): train_x = [] train_y = [] train_noise_type = [] test_x = [] test_y = [] test_noise_type = [] # print(len(mitdb_dict[subject]['MLII']['data'])) for subject in ['118', '119']: for noise_type in ['em', 'ma', 'bw', 'gn', 'all']: # for noise_type in ['gn']: # for dB in ['_6','00','06','12','18','24']: for dB in ['_6','00','06','12','18','24']: orig = mitdb_dict[subject]['MLII']['data'] orig = rescale_signal(orig) noisy = nstdb_extra_dict[subject+noise_type+dB]['MLII']['data'] noisy = rescale_signal(noisy) for i in range(0, train_end_ind, window): train_x.append(noisy[i:i+window]) train_y.append(orig[i:i+window]) train_noise_type.append(noise_type+dB) test_x.append(noisy[train_end_ind+i:train_end_ind+i+window]) test_y.append(orig[train_end_ind+i:train_end_ind+i+window]) test_noise_type.append(noise_type+dB) train_x = np.array(train_x) train_y = np.array(train_y) train_noise_type = np.array(train_noise_type) test_x = np.array(test_x) test_y = np.array(test_y) test_noise_type = np.array(test_noise_type) return train_x, train_y, train_noise_type, test_x, test_y, test_noise_type def rescale_signal(x)-
Expand source code
def rescale_signal(x): r = np.max(x) - np.min(x) if r == 0: return np.zeros_like(x) else: return 2*(x-np.min(x)) / r - 1
Classes
class Conv1DAutoencoderNet (h_size, input_len, activation=ELU(alpha=1.0), kernel_size=8, stride=2)-
Base class for all neural network modules.
Your models should also subclass this class.
Modules can also contain other Modules, allowing to nest them in a tree structure. You can assign the submodules as regular attributes::
import torch.nn as nn import torch.nn.functional as F class Model(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 20, 5) self.conv2 = nn.Conv2d(20, 20, 5) def forward(self, x): x = F.relu(self.conv1(x)) return F.relu(self.conv2(x))Submodules assigned in this way will be registered, and will have their parameters converted too when you call :meth:
to, etc.Note
As per the example above, an
__init__()call to the parent class must be made before assignment on the child.:ivar training: Boolean represents whether this module is in training or evaluation mode. :vartype training: bool
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class Conv1DAutoencoderNet(Net): def __init__(self, h_size, input_len, activation=torch.nn.ELU(), kernel_size=8, stride=2): super().__init__() pad = (((stride-1)*input_len-stride+kernel_size) // 2) self.encoder = torch.nn.ModuleList() self.encoder.append(torch.nn.Conv1d(in_channels=h_size, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.encoder.append(torch.nn.BatchNorm1d(40)) self.encoder.append(torch.nn.Conv1d(in_channels=40, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.encoder.append(torch.nn.BatchNorm1d(20)) self.encoder.append(torch.nn.Conv1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.encoder.append(torch.nn.BatchNorm1d(20)) self.encoder.append(torch.nn.Conv1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.encoder.append(torch.nn.BatchNorm1d(20)) self.encoder.append(torch.nn.Conv1d(in_channels=20, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.encoder.append(torch.nn.BatchNorm1d(40)) # self.encoder.append(torch.nn.Conv1d(in_channels=40, out_channels=1, kernel_size=kernel_size, stride=stride, padding=pad)) # self.encoder.append(activation) # self.encoder.append(torch.nn.BatchNorm1d(1)) self.decoder = torch.nn.ModuleList() # self.decoder.append(torch.nn.ConvTranspose1d(in_channels=1, out_channels=2, kernel_size=kernel_size, stride=stride, padding=pad)) # self.encoder.append(activation) # self.decoder.append(torch.nn.BatchNorm1d(1)) # self.decoder.append(torch.nn.ConvTranspose1d(in_channels=1, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad)) # self.encoder.append(activation) # self.decoder.append(torch.nn.BatchNorm1d(40)) self.decoder.append(torch.nn.ConvTranspose1d(in_channels=40, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.decoder.append(torch.nn.BatchNorm1d(20)) self.decoder.append(torch.nn.ConvTranspose1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.decoder.append(torch.nn.BatchNorm1d(20)) self.decoder.append(torch.nn.ConvTranspose1d(in_channels=20, out_channels=20, kernel_size=kernel_size, stride=stride, padding=pad)) self.encoder.append(activation) self.decoder.append(torch.nn.BatchNorm1d(20)) self.decoder.append(torch.nn.ConvTranspose1d(in_channels=20, out_channels=40, kernel_size=kernel_size, stride=stride, padding=pad)) self.decoder.append(torch.nn.BatchNorm1d(40)) self.encoder.append(activation) self.decoder.append(torch.nn.ConvTranspose1d(in_channels=40, out_channels=1, kernel_size=kernel_size, stride=stride, padding=pad)) self.device = 'cuda' if torch.cuda.is_available() else 'cpu' self.to(self.device) def forward(self, x): x = torch.permute(x, (0,2,1)) for layer in self.encoder: x = layer(x) for layer in self.decoder: x = layer(x) return x.squeeze()Ancestors
- Net
- torch.nn.modules.module.Module
Class variables
var dump_patches : boolvar training : bool
Inherited members
class Conv1DNet (h_sizes=[6, 768, 256, 64, 1], activation=LeakyReLU(negative_slope=0.01))-
Base class for all neural network modules.
Your models should also subclass this class.
Modules can also contain other Modules, allowing to nest them in a tree structure. You can assign the submodules as regular attributes::
import torch.nn as nn import torch.nn.functional as F class Model(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 20, 5) self.conv2 = nn.Conv2d(20, 20, 5) def forward(self, x): x = F.relu(self.conv1(x)) return F.relu(self.conv2(x))Submodules assigned in this way will be registered, and will have their parameters converted too when you call :meth:
to, etc.Note
As per the example above, an
__init__()call to the parent class must be made before assignment on the child.:ivar training: Boolean represents whether this module is in training or evaluation mode. :vartype training: bool
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class Conv1DNet(Net): def __init__(self, h_sizes=[6, 768, 256, 64, 1], activation=torch.nn.LeakyReLU()): super(Conv1DNet, self).__init__() self.layers = torch.nn.ModuleList() for k in range(len(h_sizes)-1): # self.layers.append(torch.nn.Linear(h_sizes[k], h_sizes[k+1])) self.layers.append(torch.nn.Conv1d(in_channels=h_sizes[k], out_channels=h_sizes[k+1], kernel_size=5, padding='same')) self.layers.append(activation) if k != len(h_sizes)-1: self.layers.append(torch.nn.BatchNorm1d(h_sizes[k+1])) # self.layers.append(torch.nn.Dropout(p=0.5)) self.device = 'cuda' if torch.cuda.is_available() else 'cpu' self.to(self.device) def forward(self, x): x = torch.permute(x, (0,2,1)) for layer in self.layers: x = layer(x) return x.squeeze()Ancestors
- Net
- torch.nn.modules.module.Module
Class variables
var dump_patches : boolvar training : bool
Inherited members
class LSTMNet (input_dim, hidden_dim, layer_dim, output_dim=1, activation=LeakyReLU(negative_slope=0.01))-
Base class for all neural network modules.
Your models should also subclass this class.
Modules can also contain other Modules, allowing to nest them in a tree structure. You can assign the submodules as regular attributes::
import torch.nn as nn import torch.nn.functional as F class Model(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 20, 5) self.conv2 = nn.Conv2d(20, 20, 5) def forward(self, x): x = F.relu(self.conv1(x)) return F.relu(self.conv2(x))Submodules assigned in this way will be registered, and will have their parameters converted too when you call :meth:
to, etc.Note
As per the example above, an
__init__()call to the parent class must be made before assignment on the child.:ivar training: Boolean represents whether this module is in training or evaluation mode. :vartype training: bool
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class LSTMNet(Net): def __init__(self, input_dim, hidden_dim, layer_dim, output_dim=1, activation=torch.nn.LeakyReLU()): super(LSTMNet, self).__init__() """ Flexible feed forward network over a base encoder. Parameters ---------- encoder_model: The base encoder model h_sizes: Linear layer sizes to be used in the MLP activation: Activation function to be use in the MLP. device: Device to use for training. 'cpu' by default. """ self.layer_dim = layer_dim self.hidden_dim = hidden_dim self.device = 'cuda' if torch.cuda.is_available() else 'cpu' self.activation = activation self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True) self.fc = torch.nn.Linear(hidden_dim, output_dim) self.to(self.device) def forward(self, x): # Initialize hidden state with zeros h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(self.device) # Initialize cell state c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().to(self.device) # We need to detach as we are doing truncated backpropagation through time (BPTT) # If we don't, we'll backprop all the way to the start even after going through another batch out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach())) # out, (hn, cn) = self.lstm(x, (h0, c0)) # out shape is (batch_size, seq_len, hidden_dim) # Index hidden state of last time step # print(out.shape) out = self.activation(out) out = self.fc(out) # out shape is (batch_size, seq_len, 1) out = out.squeeze() # out shape is (batch_size, seq_len) return outAncestors
- Net
- torch.nn.modules.module.Module
Class variables
var dump_patches : boolvar training : bool
Inherited members
class Net (h_sizes=[768, 256, 64, 2], activation=LeakyReLU(negative_slope=0.01))-
Base class for all neural network modules.
Your models should also subclass this class.
Modules can also contain other Modules, allowing to nest them in a tree structure. You can assign the submodules as regular attributes::
import torch.nn as nn import torch.nn.functional as F class Model(nn.Module): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 20, 5) self.conv2 = nn.Conv2d(20, 20, 5) def forward(self, x): x = F.relu(self.conv1(x)) return F.relu(self.conv2(x))Submodules assigned in this way will be registered, and will have their parameters converted too when you call :meth:
to, etc.Note
As per the example above, an
__init__()call to the parent class must be made before assignment on the child.:ivar training: Boolean represents whether this module is in training or evaluation mode. :vartype training: bool
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code
class Net(torch.nn.Module): def __init__(self, h_sizes=[768, 256, 64, 2], activation=torch.nn.LeakyReLU()): super(Net, self).__init__() """ Flexible feed forward network over a base encoder. Parameters ---------- encoder_model: The base encoder model h_sizes: Linear layer sizes to be used in the MLP activation: Activation function to be use in the MLP. device: Device to use for training. 'cpu' by default. """ self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') self.layers = torch.nn.ModuleList() for k in range(len(h_sizes)-1): self.layers.append(torch.nn.Linear(h_sizes[k], h_sizes[k+1])) self.layers.append(activation) # self.layers.append(torch.nn.Dropout(p=0.5)) self.to(self.device) def forward(self, x): for layer in self.layers: x = layer(x) return x def fit(self, x_train, y_train, batch_size=32, epochs=10, criterion=torch.nn.MSELoss(), lr=1e-3, weight_decay=1e-4, shuffle=True, seed=0): assert type(x_train)==np.ndarray and type(y_train)==np.ndarray np.random.seed(seed) x_train = torch.from_numpy(x_train).float() y_train = torch.from_numpy(y_train).float() optimizer = torch.optim.Adam(self.parameters(), lr=lr, weight_decay=weight_decay) print(self.device) for epoch in tqdm(range(epochs)): if shuffle: inds = np.arange(len(x_train)) np.random.shuffle(inds) x_train = x_train[inds] y_train = y_train[inds] for i in range(0, len(x_train), batch_size): batch_x = x_train[i:i+batch_size].to(self.device) batch_y = y_train[i:i+batch_size].to(self.device) out = self.forward(batch_x) loss = criterion(out, batch_y) optimizer.zero_grad() loss.backward() optimizer.step() if i % 10 == 0: tqdm.write("loss: {:.6f}".format(loss.detach().cpu().numpy())) def predict(self, x, batch_size=200): with torch.no_grad(): x = torch.from_numpy(x).float() out = [] for i in range(0, len(x), batch_size): batch_x = x[i:i+batch_size].to(self.device) out.append(self.forward(batch_x).cpu().numpy()) return np.concatenate(out)Ancestors
- torch.nn.modules.module.Module
Subclasses
Class variables
var dump_patches : boolvar training : bool
Methods
def fit(self, x_train, y_train, batch_size=32, epochs=10, criterion=MSELoss(), lr=0.001, weight_decay=0.0001, shuffle=True, seed=0)-
Expand source code
def fit(self, x_train, y_train, batch_size=32, epochs=10, criterion=torch.nn.MSELoss(), lr=1e-3, weight_decay=1e-4, shuffle=True, seed=0): assert type(x_train)==np.ndarray and type(y_train)==np.ndarray np.random.seed(seed) x_train = torch.from_numpy(x_train).float() y_train = torch.from_numpy(y_train).float() optimizer = torch.optim.Adam(self.parameters(), lr=lr, weight_decay=weight_decay) print(self.device) for epoch in tqdm(range(epochs)): if shuffle: inds = np.arange(len(x_train)) np.random.shuffle(inds) x_train = x_train[inds] y_train = y_train[inds] for i in range(0, len(x_train), batch_size): batch_x = x_train[i:i+batch_size].to(self.device) batch_y = y_train[i:i+batch_size].to(self.device) out = self.forward(batch_x) loss = criterion(out, batch_y) optimizer.zero_grad() loss.backward() optimizer.step() if i % 10 == 0: tqdm.write("loss: {:.6f}".format(loss.detach().cpu().numpy())) def forward(self, x) ‑> Callable[..., Any]-
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the :class:
Moduleinstance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.Expand source code
def forward(self, x): for layer in self.layers: x = layer(x) return x def predict(self, x, batch_size=200)-
Expand source code
def predict(self, x, batch_size=200): with torch.no_grad(): x = torch.from_numpy(x).float() out = [] for i in range(0, len(x), batch_size): batch_x = x[i:i+batch_size].to(self.device) out.append(self.forward(batch_x).cpu().numpy()) return np.concatenate(out)