Module signal_quality.datasets
Expand source code
import numpy as np
import wfdb
import matplotlib.pyplot as plt
import biosppy
import scipy
import sklearn
from tqdm import tqdm
def load_ecg_by_windows(channel, rates, noise_threshold, window_size=60*125, step=30*125):
"""
channel = the entire raw ECG waveform of shape (length,)
rates = the processed (numeric) annotations of same shape as ECG (length,)
sampling_rate = the hz of the input ECG signal
downsampled_sampling_rate = the hz to downsample to
window_size = the window, after downsampling, of indices in a feature window
"""
# Split into individual heartbeats. For each heartbeat
# record, append classification (normal/abnormal).
labels = []
beats = []
# Skip first and last beat.
for idxval in range(window_size, len(channel), step):
# Get the classification value that is on
# or near the position of the rpeak index.
# set as abnormal beat if it exists
if rates is not None:
window_labels = rates[max(0,idxval-window_size):idxval]
# Skip beat if there is no classification.
if (window_labels==-1.0).sum() == len(window_labels):
continue
# print((window_labels==1.0).sum() / len(window_labels))
if ((window_labels==1.0).sum() / len(window_labels)) > noise_threshold:
catval = 1
else:
catval = 0
labels.append(catval)
# Append some extra readings around the beat.
beat = channel[max(0,idxval-window_size):idxval]
# # Normalize the readings to a 0-1 range for ML purposes.
# beat_range = beat.max() - beat.min()
# if beat_range == 0:
# continue
# beat = (beat - beat.min()) / beat_range
beats.append(beat)
# return data and labels
return beats, labels
def load_ecg_beat_by_beat(channel, rates, sampling_rate, beat_window=90, show=False):
"""
channel = the entire raw ECG waveform of shape (length,)
rates = the processed (numeric) annotations of same shape as ECG (length,)
sampling_rate = the hz of the input ECG signal
downsampled_sampling_rate = the hz to downsample to
beat_window = the window, after downsampling, of indices to get around the peak
show = a boolean value whether to show the obtained peaks
"""
# Instead of using the annotations to find the beats, we will
# use R-peak detection instead. The reason for this is so that
# the same logic can be used to analyze new and un-annotated
# ECG data. We use the annotations here only to classify the
# beat as either Normal or Abnormal and to train the model.
# Find rpeaks in the ECG data. Most should match with
# the annotations.
# biosppy.signals.ecg.ecg returns:
# names = ('ts', 'filtered', 'rpeaks', 'templates_ts', 'templates',
# 'heart_rate_ts', 'heart_rate')
out = biosppy.signals.ecg.ecg(signal=channel, sampling_rate=sampling_rate, show=show)
# Split into individual heartbeats. For each heartbeat
# record, append classification (normal/abnormal).
labels = []
beats = []
# Skip first and last beat.
for idxval in out['rpeaks'][1:-1]:
# Get the classification value that is on
# or near the position of the rpeak index.
# set as abnormal beat if it exists
if rates is not None:
catval = rates[max(0,idxval-beat_window):idxval+beat_window].max()
# Skip beat if there is no classification.
if catval == -1.0:
continue
labels.append(catval)
# Append some extra readings around the beat.
beat = channel[max(0,idxval-beat_window):idxval+beat_window]
if show:
plt.plot(beat)
plt.scatter(beat_window, 1, c='r', label='detected peak')
plt.legend(loc='upper right'); plt.grid(); plt.ylabel('ECG'); plt.xlabel(str(sampling_rate)+' hz'); plt.show()
# # Normalize the readings to a 0-1 range for ML purposes.
# beat_range = beat.max() - beat.min()
# if beat_range == 0:
# continue
# beat = (beat - beat.min()) / beat_range
beats.append(beat)
# return data and labels
return beats, labels
def get_power(signal):
"""" Helper function to return average power of signal
"""
return np.mean(np.power(signal, 2))
def calc_snr(signal, noise):
"""" Calculates signal to noise ratio
"""
signal_avg_watts = get_power(signal)
noise_avg_watts = get_power(noise)
return 10*(np.log10(signal_avg_watts) - np.log10(noise_avg_watts))
def add_noise(signal, method, target_snr_db=18, random_state=0, verbose=False):
""" Adds noise of specified target_snr_db dB to signal
method = type of noise to add
"""
np.random.seed(random_state)
# Adding noise using target SNR
# Calculate signal power and convert to dB
sig_avg_watts = get_power(signal)
sig_avg_db = 10 * np.log10(sig_avg_watts)
# Calculate noise according to [2] then convert to watts
noise_avg_db = sig_avg_db - target_snr_db
noise_avg_watts = 10 ** (noise_avg_db / 10)
if method == 'gaussian':
# Generate an sample of white noise
noise_volts = np.random.normal(0, np.sqrt(noise_avg_watts), len(signal))
else:
raise NotImplementedError
if verbose:
print('SNR', calc_snr(signal, noise_volts))
# Noise up the original signal
return signal + noise_volts
def add_noisy_signal(signal, noise, target_snr_db=18, verbose=False):
""" Adds noise of specified target_snr_db dB to signal via rescaling
"""
# Calculate signal power and convert to dB
sig_avg_watts = get_power(signal)
sig_avg_db = 10 * np.log10(sig_avg_watts)
# Calculate noise according to [2] then convert to watts
noise_avg_db = sig_avg_db - target_snr_db
noise_avg_watts = 10 ** (noise_avg_db / 10)
# rescale noise
noise -= np.mean(noise)
noise *= np.sqrt(noise_avg_watts) / noise.std()
if verbose:
print('SNR', calc_snr(signal, noise))
# Noise up the original signal
return signal + noise
def load_nstdb_extra(nstdb_path, mitdb_path, verbose=False, downsampled_sampling_rate=125):
"""
https://physionet.org/content/nstdb/1.0.0/
## 1. Install the WFDB package: https://www.physionet.org/content/wfdb/
## Source: Moody, G., Pollard, T., & Moody, B. (2021). WFDB Software Package (version 10.6.2). PhysioNet. https://doi.org/10.13026/zzpx-h016.
## Source: Goldberger, A., Amaral, L., Glass, L., Hausdorff, J., Ivanov, P. C., Mark, R., ... & Stanley, H. E. (2000). PhysioBank, PhysioToolkit, and PhysioNet:
## Components of a new research resource for complex physiologic signals. Circulation [Online]. 101 (23), pp. e215–e220.
##
## 2. Download the MIT-BIH Noise Stress Test Database: https://physionet.org/content/nstdb/ and rename it to "nstdb"
## Source: Moody GB, Muldrow WE, Mark RG. A noise stress test for arrhythmia detectors. Computers in Cardiology 1984; 11:381-384.
##
## 3. Download the MIT-BIH: https://physionet.org/content/mitdb/ and rename it to "mitdb"
## Source: Moody GB, Mark RG. The impact of the MIT-BIH Arrhythmia Database. IEEE Eng in Med and Biol 20(3):45-50 (May-June 2001). (PMID: 11446209)
##
## 4. Run "bash generate_nstdb_extra.sh" to create the nstdb_extra directory
## This takes in 2 clean ECGs (118 and 119) and adds 3 types of noise (baseline wander, muscle EMG artifact, electrode motion artifact)
## at different signal-to-noise ratios (24,18,12,6,0,-6) in dB. Original script only adds one type of noise, this script adds all 3 types.
##
## Source: https://physionet.org/content/nstdb/1.0.0/nstdbgen
## Source: https://physionet.org/physiotools/wag/nst-1.htm
##
data_path = path where the .dat, .hea, .atr files are located for every patient
"""
subjects = ['118', '119']
output_dict = {}
for subject in subjects:
# Read in the data
record = wfdb.rdrecord(mitdb_path+subject)
data = record.p_signal.transpose()
for noise_type in ['em', 'ma', 'bw', 'gn']:
for dB in ['_6','00','06','12','18','24']:
int_db = int(dB.replace('_','-'))
subject_str = subject+noise_type+dB
output_dict[subject_str] = {}
# Process each channel separately (2 per input file).
for channelid, channel in enumerate(data):
chname = record.sig_name[channelid]
# Resample from 360Hz to 125Hz
newsize = int((len(channel) * downsampled_sampling_rate / record.fs) + 0.5)
channel = scipy.signal.resample(channel, newsize)
if verbose:
print('Name',subject_str,'ECG channel type:',chname)
if noise_type in ['em', 'ma', 'bw']:
noise_record = wfdb.rdrecord(nstdb_path+noise_type)
noise_data = noise_record.p_signal.transpose()
noise_channel = noise_data[channelid%2] # alternate channels of noise to add, per literature
newsize = int((len(noise_channel) * downsampled_sampling_rate / record.fs) + 0.5)
noise_channel = scipy.signal.resample(noise_channel, newsize)
output_dict[subject_str][chname] = {'data' : add_noisy_signal(channel, noise_channel, target_snr_db=int_db), 'labels' : None}
elif noise_type=='gn':
output_dict[subject_str][chname] = {'data' : add_noise(signal=channel, method='gaussian', target_snr_db=int_db), 'labels' : None}
# noise_type=='all':
noise_type = 'all'
for dB in ['_6','00','06','12','18','24']:
int_db = int(dB.replace('_','-'))
subject_str = subject+noise_type+dB
output_dict[subject_str] = {}
# Process each channel separately (2 per input file).
for channelid, channel in enumerate(data):
chname = record.sig_name[channelid]
# Resample from 360Hz to 125Hz
newsize = int((len(channel) * downsampled_sampling_rate / record.fs) + 0.5)
channel = scipy.signal.resample(channel, newsize)
all_noise = np.zeros_like(channel)
for noise_type_ in ['em', 'ma', 'bw', 'gn']:
all_noise += (output_dict[subject+noise_type_+dB][chname]['data'] - channel)
output_dict[subject_str][chname] = {'data' : add_noisy_signal(channel, all_noise, target_snr_db=int_db), 'labels' : None}
return output_dict
def load_mitdb(data_path, verbose=False, downsampled_sampling_rate=125):
"""
https://physionet.org/content/mitdb/1.0.0/
data_path = path where the .dat, .hea, .atr files are located for every patient
"""
realbeats = ['L','R','B','A','a','J','S','V','r',
'F','e','j','n','E','/','f','Q','?']
normalbeats = ['N']
# Loop through each input file. Each file contains one
# record of ECG readings, sampled at 360 readings per
# second.
# np.warnings.filterwarnings('error', category=np.VisibleDeprecationWarning)
subjects = [
'100','101','102','103','104', '105','106','107','108','109',
'111','112','113','114','115', '116','117','118','119','121',
'122','123','124','200','201', '202','203','205','207','208',
'209','210','212','213','214', '215','217','219','220','221',
'222','223','228','230','231', '232','233','234']
# subjects = [
# '100',]
output_dict = {}
for subject in tqdm(subjects):
output_dict[subject] = {}
# Read in the data
record = wfdb.rdrecord(data_path+subject)
annotation = wfdb.rdann(data_path+subject, 'atr')
if verbose:
# Print some meta informations
print('Sampling frequency used for this record:', record.fs)
print('Shape of loaded data array:', record.p_signal.shape)
print('Number of loaded annotations:', len(annotation.num))
# Get the ECG values from the file.
data = record.p_signal.transpose()
# Generate the classifications based on the annotations.
# -1.0 = undetermined
# 0.0 = normal
# 1.0 = abnormal
rate = np.zeros_like(annotation.symbol, dtype='float')
rate[~np.isin(annotation.symbol, normalbeats+realbeats)] = -1.0
rate[np.isin(annotation.symbol, realbeats)] = 1.0
rate[np.isin(annotation.symbol, normalbeats)] = 0.0
rates = np.zeros_like(data[0], dtype='float')
rates[annotation.sample] = rate
# Process each channel separately (2 per input file).
for channelid, channel in enumerate(data):
chname = record.sig_name[channelid]
# Resample from 360Hz to 125Hz
newsize = int((len(channel) * downsampled_sampling_rate / record.fs) + 0.5)
channel = scipy.signal.resample(channel, newsize)
if verbose:
print('Name', subject, 'ECG channel type:', chname)
output_dict[subject][chname] = {'data' : channel, 'labels' : rates}
return output_dict
def load_picc(data_path, verbose=True):
"""
https://physionet.org/content/challenge-2011/1.0.0/
data_path = path where the .dat, .hea, .atr files are located for every patient
load_method='beat_by_beat' either load and process data beat by beat, or by 'windows'
Note: Different featurizations may be used for each sqi
"""
# np.warnings.filterwarnings('error', category=np.VisibleDeprecationWarning)
normal_subjects = open(data_path+'set-a/RECORDS-acceptable', 'r').readlines()
normal_subjects = [subj.replace('\n','') for subj in normal_subjects]
normal_labels = [0.0 for _ in range(len(normal_subjects))]
abnormal_subjects = open(data_path+'set-a/RECORDS-unacceptable', 'r').readlines()
abnormal_subjects = [subj.replace('\n','') for subj in abnormal_subjects]
abnormal_labels = [1.0 for _ in range(len(abnormal_subjects))]
all_subjects = normal_subjects + abnormal_subjects
all_labels = normal_labels + abnormal_labels
output_dict = {}
for subject, label in tqdm(zip(all_subjects, all_labels)):
output_dict[subject] = {}
# Read in the data
record = wfdb.rdrecord(data_path+'set-a/'+subject)
# Print some meta informations
if verbose:
print('Sampling frequency used for this record:', record.fs)
print('Shape of loaded data array:', record.p_signal.shape)
# Get the ECG values from the file.
data = record.p_signal.transpose()
# Process each channel separately (12 per input file).
for channelid, channel in enumerate(data):
newsize = int((len(channel) * 125 / record.fs) + 0.5)
channel = scipy.signal.resample(channel, newsize)
chname = record.sig_name[channelid]
if verbose:
print('ECG channel type:', chname)
output_dict[subject][chname] = {'data':channel, 'label':label}
return output_dict
Functions
def add_noise(signal, method, target_snr_db=18, random_state=0, verbose=False)-
Adds noise of specified target_snr_db dB to signal
method = type of noise to add
Expand source code
def add_noise(signal, method, target_snr_db=18, random_state=0, verbose=False): """ Adds noise of specified target_snr_db dB to signal method = type of noise to add """ np.random.seed(random_state) # Adding noise using target SNR # Calculate signal power and convert to dB sig_avg_watts = get_power(signal) sig_avg_db = 10 * np.log10(sig_avg_watts) # Calculate noise according to [2] then convert to watts noise_avg_db = sig_avg_db - target_snr_db noise_avg_watts = 10 ** (noise_avg_db / 10) if method == 'gaussian': # Generate an sample of white noise noise_volts = np.random.normal(0, np.sqrt(noise_avg_watts), len(signal)) else: raise NotImplementedError if verbose: print('SNR', calc_snr(signal, noise_volts)) # Noise up the original signal return signal + noise_volts def add_noisy_signal(signal, noise, target_snr_db=18, verbose=False)-
Adds noise of specified target_snr_db dB to signal via rescaling
Expand source code
def add_noisy_signal(signal, noise, target_snr_db=18, verbose=False): """ Adds noise of specified target_snr_db dB to signal via rescaling """ # Calculate signal power and convert to dB sig_avg_watts = get_power(signal) sig_avg_db = 10 * np.log10(sig_avg_watts) # Calculate noise according to [2] then convert to watts noise_avg_db = sig_avg_db - target_snr_db noise_avg_watts = 10 ** (noise_avg_db / 10) # rescale noise noise -= np.mean(noise) noise *= np.sqrt(noise_avg_watts) / noise.std() if verbose: print('SNR', calc_snr(signal, noise)) # Noise up the original signal return signal + noise def calc_snr(signal, noise)-
" Calculates signal to noise ratio
Expand source code
def calc_snr(signal, noise): """" Calculates signal to noise ratio """ signal_avg_watts = get_power(signal) noise_avg_watts = get_power(noise) return 10*(np.log10(signal_avg_watts) - np.log10(noise_avg_watts)) def get_power(signal)-
" Helper function to return average power of signal
Expand source code
def get_power(signal): """" Helper function to return average power of signal """ return np.mean(np.power(signal, 2)) def load_ecg_beat_by_beat(channel, rates, sampling_rate, beat_window=90, show=False)-
channel = the entire raw ECG waveform of shape (length,) rates = the processed (numeric) annotations of same shape as ECG (length,) sampling_rate = the hz of the input ECG signal downsampled_sampling_rate = the hz to downsample to beat_window = the window, after downsampling, of indices to get around the peak show = a boolean value whether to show the obtained peaks
Expand source code
def load_ecg_beat_by_beat(channel, rates, sampling_rate, beat_window=90, show=False): """ channel = the entire raw ECG waveform of shape (length,) rates = the processed (numeric) annotations of same shape as ECG (length,) sampling_rate = the hz of the input ECG signal downsampled_sampling_rate = the hz to downsample to beat_window = the window, after downsampling, of indices to get around the peak show = a boolean value whether to show the obtained peaks """ # Instead of using the annotations to find the beats, we will # use R-peak detection instead. The reason for this is so that # the same logic can be used to analyze new and un-annotated # ECG data. We use the annotations here only to classify the # beat as either Normal or Abnormal and to train the model. # Find rpeaks in the ECG data. Most should match with # the annotations. # biosppy.signals.ecg.ecg returns: # names = ('ts', 'filtered', 'rpeaks', 'templates_ts', 'templates', # 'heart_rate_ts', 'heart_rate') out = biosppy.signals.ecg.ecg(signal=channel, sampling_rate=sampling_rate, show=show) # Split into individual heartbeats. For each heartbeat # record, append classification (normal/abnormal). labels = [] beats = [] # Skip first and last beat. for idxval in out['rpeaks'][1:-1]: # Get the classification value that is on # or near the position of the rpeak index. # set as abnormal beat if it exists if rates is not None: catval = rates[max(0,idxval-beat_window):idxval+beat_window].max() # Skip beat if there is no classification. if catval == -1.0: continue labels.append(catval) # Append some extra readings around the beat. beat = channel[max(0,idxval-beat_window):idxval+beat_window] if show: plt.plot(beat) plt.scatter(beat_window, 1, c='r', label='detected peak') plt.legend(loc='upper right'); plt.grid(); plt.ylabel('ECG'); plt.xlabel(str(sampling_rate)+' hz'); plt.show() # # Normalize the readings to a 0-1 range for ML purposes. # beat_range = beat.max() - beat.min() # if beat_range == 0: # continue # beat = (beat - beat.min()) / beat_range beats.append(beat) # return data and labels return beats, labels def load_ecg_by_windows(channel, rates, noise_threshold, window_size=7500, step=3750)-
channel = the entire raw ECG waveform of shape (length,) rates = the processed (numeric) annotations of same shape as ECG (length,) sampling_rate = the hz of the input ECG signal downsampled_sampling_rate = the hz to downsample to window_size = the window, after downsampling, of indices in a feature window
Expand source code
def load_ecg_by_windows(channel, rates, noise_threshold, window_size=60*125, step=30*125): """ channel = the entire raw ECG waveform of shape (length,) rates = the processed (numeric) annotations of same shape as ECG (length,) sampling_rate = the hz of the input ECG signal downsampled_sampling_rate = the hz to downsample to window_size = the window, after downsampling, of indices in a feature window """ # Split into individual heartbeats. For each heartbeat # record, append classification (normal/abnormal). labels = [] beats = [] # Skip first and last beat. for idxval in range(window_size, len(channel), step): # Get the classification value that is on # or near the position of the rpeak index. # set as abnormal beat if it exists if rates is not None: window_labels = rates[max(0,idxval-window_size):idxval] # Skip beat if there is no classification. if (window_labels==-1.0).sum() == len(window_labels): continue # print((window_labels==1.0).sum() / len(window_labels)) if ((window_labels==1.0).sum() / len(window_labels)) > noise_threshold: catval = 1 else: catval = 0 labels.append(catval) # Append some extra readings around the beat. beat = channel[max(0,idxval-window_size):idxval] # # Normalize the readings to a 0-1 range for ML purposes. # beat_range = beat.max() - beat.min() # if beat_range == 0: # continue # beat = (beat - beat.min()) / beat_range beats.append(beat) # return data and labels return beats, labels def load_mitdb(data_path, verbose=False, downsampled_sampling_rate=125)-
https://physionet.org/content/mitdb/1.0.0/
data_path = path where the .dat, .hea, .atr files are located for every patient
Expand source code
def load_mitdb(data_path, verbose=False, downsampled_sampling_rate=125): """ https://physionet.org/content/mitdb/1.0.0/ data_path = path where the .dat, .hea, .atr files are located for every patient """ realbeats = ['L','R','B','A','a','J','S','V','r', 'F','e','j','n','E','/','f','Q','?'] normalbeats = ['N'] # Loop through each input file. Each file contains one # record of ECG readings, sampled at 360 readings per # second. # np.warnings.filterwarnings('error', category=np.VisibleDeprecationWarning) subjects = [ '100','101','102','103','104', '105','106','107','108','109', '111','112','113','114','115', '116','117','118','119','121', '122','123','124','200','201', '202','203','205','207','208', '209','210','212','213','214', '215','217','219','220','221', '222','223','228','230','231', '232','233','234'] # subjects = [ # '100',] output_dict = {} for subject in tqdm(subjects): output_dict[subject] = {} # Read in the data record = wfdb.rdrecord(data_path+subject) annotation = wfdb.rdann(data_path+subject, 'atr') if verbose: # Print some meta informations print('Sampling frequency used for this record:', record.fs) print('Shape of loaded data array:', record.p_signal.shape) print('Number of loaded annotations:', len(annotation.num)) # Get the ECG values from the file. data = record.p_signal.transpose() # Generate the classifications based on the annotations. # -1.0 = undetermined # 0.0 = normal # 1.0 = abnormal rate = np.zeros_like(annotation.symbol, dtype='float') rate[~np.isin(annotation.symbol, normalbeats+realbeats)] = -1.0 rate[np.isin(annotation.symbol, realbeats)] = 1.0 rate[np.isin(annotation.symbol, normalbeats)] = 0.0 rates = np.zeros_like(data[0], dtype='float') rates[annotation.sample] = rate # Process each channel separately (2 per input file). for channelid, channel in enumerate(data): chname = record.sig_name[channelid] # Resample from 360Hz to 125Hz newsize = int((len(channel) * downsampled_sampling_rate / record.fs) + 0.5) channel = scipy.signal.resample(channel, newsize) if verbose: print('Name', subject, 'ECG channel type:', chname) output_dict[subject][chname] = {'data' : channel, 'labels' : rates} return output_dict def load_nstdb_extra(nstdb_path, mitdb_path, verbose=False, downsampled_sampling_rate=125)-
https://physionet.org/content/nstdb/1.0.0/
1. Install the WFDB package: https://www.physionet.org/content/wfdb/
Source: Moody, G., Pollard, T., & Moody, B. (2021). WFDB Software Package (version 10.6.2). PhysioNet. https://doi.org/10.13026/zzpx-h016.
Source: Goldberger, A., Amaral, L., Glass, L., Hausdorff, J., Ivanov, P. C., Mark, R., … & Stanley, H. E. (2000). PhysioBank, PhysioToolkit, and PhysioNet:
Components of a new research resource for complex physiologic signals. Circulation [Online]. 101 (23), pp. e215–e220.
2. Download the MIT-BIH Noise Stress Test Database: https://physionet.org/content/nstdb/ and rename it to "nstdb"
Source: Moody GB, Muldrow WE, Mark RG. A noise stress test for arrhythmia detectors. Computers in Cardiology 1984; 11:381-384.
3. Download the MIT-BIH: https://physionet.org/content/mitdb/ and rename it to "mitdb"
Source: Moody GB, Mark RG. The impact of the MIT-BIH Arrhythmia Database. IEEE Eng in Med and Biol 20(3):45-50 (May-June 2001). (PMID: 11446209)
4. Run "bash generate_nstdb_extra.sh" to create the nstdb_extra directory
This takes in 2 clean ECGs (118 and 119) and adds 3 types of noise (baseline wander, muscle EMG artifact, electrode motion artifact)
at different signal-to-noise ratios (24,18,12,6,0,-6) in dB. Original script only adds one type of noise, this script adds all 3 types.
Source: https://physionet.org/content/nstdb/1.0.0/nstdbgen
Source: https://physionet.org/physiotools/wag/nst-1.htm
data_path = path where the .dat, .hea, .atr files are located for every patient
Expand source code
def load_nstdb_extra(nstdb_path, mitdb_path, verbose=False, downsampled_sampling_rate=125): """ https://physionet.org/content/nstdb/1.0.0/ ## 1. Install the WFDB package: https://www.physionet.org/content/wfdb/ ## Source: Moody, G., Pollard, T., & Moody, B. (2021). WFDB Software Package (version 10.6.2). PhysioNet. https://doi.org/10.13026/zzpx-h016. ## Source: Goldberger, A., Amaral, L., Glass, L., Hausdorff, J., Ivanov, P. C., Mark, R., ... & Stanley, H. E. (2000). PhysioBank, PhysioToolkit, and PhysioNet: ## Components of a new research resource for complex physiologic signals. Circulation [Online]. 101 (23), pp. e215–e220. ## ## 2. Download the MIT-BIH Noise Stress Test Database: https://physionet.org/content/nstdb/ and rename it to "nstdb" ## Source: Moody GB, Muldrow WE, Mark RG. A noise stress test for arrhythmia detectors. Computers in Cardiology 1984; 11:381-384. ## ## 3. Download the MIT-BIH: https://physionet.org/content/mitdb/ and rename it to "mitdb" ## Source: Moody GB, Mark RG. The impact of the MIT-BIH Arrhythmia Database. IEEE Eng in Med and Biol 20(3):45-50 (May-June 2001). (PMID: 11446209) ## ## 4. Run "bash generate_nstdb_extra.sh" to create the nstdb_extra directory ## This takes in 2 clean ECGs (118 and 119) and adds 3 types of noise (baseline wander, muscle EMG artifact, electrode motion artifact) ## at different signal-to-noise ratios (24,18,12,6,0,-6) in dB. Original script only adds one type of noise, this script adds all 3 types. ## ## Source: https://physionet.org/content/nstdb/1.0.0/nstdbgen ## Source: https://physionet.org/physiotools/wag/nst-1.htm ## data_path = path where the .dat, .hea, .atr files are located for every patient """ subjects = ['118', '119'] output_dict = {} for subject in subjects: # Read in the data record = wfdb.rdrecord(mitdb_path+subject) data = record.p_signal.transpose() for noise_type in ['em', 'ma', 'bw', 'gn']: for dB in ['_6','00','06','12','18','24']: int_db = int(dB.replace('_','-')) subject_str = subject+noise_type+dB output_dict[subject_str] = {} # Process each channel separately (2 per input file). for channelid, channel in enumerate(data): chname = record.sig_name[channelid] # Resample from 360Hz to 125Hz newsize = int((len(channel) * downsampled_sampling_rate / record.fs) + 0.5) channel = scipy.signal.resample(channel, newsize) if verbose: print('Name',subject_str,'ECG channel type:',chname) if noise_type in ['em', 'ma', 'bw']: noise_record = wfdb.rdrecord(nstdb_path+noise_type) noise_data = noise_record.p_signal.transpose() noise_channel = noise_data[channelid%2] # alternate channels of noise to add, per literature newsize = int((len(noise_channel) * downsampled_sampling_rate / record.fs) + 0.5) noise_channel = scipy.signal.resample(noise_channel, newsize) output_dict[subject_str][chname] = {'data' : add_noisy_signal(channel, noise_channel, target_snr_db=int_db), 'labels' : None} elif noise_type=='gn': output_dict[subject_str][chname] = {'data' : add_noise(signal=channel, method='gaussian', target_snr_db=int_db), 'labels' : None} # noise_type=='all': noise_type = 'all' for dB in ['_6','00','06','12','18','24']: int_db = int(dB.replace('_','-')) subject_str = subject+noise_type+dB output_dict[subject_str] = {} # Process each channel separately (2 per input file). for channelid, channel in enumerate(data): chname = record.sig_name[channelid] # Resample from 360Hz to 125Hz newsize = int((len(channel) * downsampled_sampling_rate / record.fs) + 0.5) channel = scipy.signal.resample(channel, newsize) all_noise = np.zeros_like(channel) for noise_type_ in ['em', 'ma', 'bw', 'gn']: all_noise += (output_dict[subject+noise_type_+dB][chname]['data'] - channel) output_dict[subject_str][chname] = {'data' : add_noisy_signal(channel, all_noise, target_snr_db=int_db), 'labels' : None} return output_dict def load_picc(data_path, verbose=True)-
https://physionet.org/content/challenge-2011/1.0.0/
data_path = path where the .dat, .hea, .atr files are located for every patient load_method='beat_by_beat' either load and process data beat by beat, or by 'windows'
Note: Different featurizations may be used for each sqi
Expand source code
def load_picc(data_path, verbose=True): """ https://physionet.org/content/challenge-2011/1.0.0/ data_path = path where the .dat, .hea, .atr files are located for every patient load_method='beat_by_beat' either load and process data beat by beat, or by 'windows' Note: Different featurizations may be used for each sqi """ # np.warnings.filterwarnings('error', category=np.VisibleDeprecationWarning) normal_subjects = open(data_path+'set-a/RECORDS-acceptable', 'r').readlines() normal_subjects = [subj.replace('\n','') for subj in normal_subjects] normal_labels = [0.0 for _ in range(len(normal_subjects))] abnormal_subjects = open(data_path+'set-a/RECORDS-unacceptable', 'r').readlines() abnormal_subjects = [subj.replace('\n','') for subj in abnormal_subjects] abnormal_labels = [1.0 for _ in range(len(abnormal_subjects))] all_subjects = normal_subjects + abnormal_subjects all_labels = normal_labels + abnormal_labels output_dict = {} for subject, label in tqdm(zip(all_subjects, all_labels)): output_dict[subject] = {} # Read in the data record = wfdb.rdrecord(data_path+'set-a/'+subject) # Print some meta informations if verbose: print('Sampling frequency used for this record:', record.fs) print('Shape of loaded data array:', record.p_signal.shape) # Get the ECG values from the file. data = record.p_signal.transpose() # Process each channel separately (12 per input file). for channelid, channel in enumerate(data): newsize = int((len(channel) * 125 / record.fs) + 0.5) channel = scipy.signal.resample(channel, newsize) chname = record.sig_name[channelid] if verbose: print('ECG channel type:', chname) output_dict[subject][chname] = {'data':channel, 'label':label} return output_dict