Module signal_quality.sqis

Expand source code
import numpy as np
import biosppy
import pyhrv
import matplotlib.pyplot as plt
import scipy.stats
import neurokit2 as nk
import antropy
import sklearn

### ===================================================== Relevant ECG Features =====================================================

def orphanidou2015_sqi(ecg_cleaned, sampling_rate, show=False):
    """ Implementation of template matching approach introduced by Orphanidou et al. 
    Returns the average correlation coefficient (scipy.stats.pearsonr) of the QRS waveforms that ranges from -1 to 1

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The hz of the input ECG signal
    show : bool
         Flag whether to show the obtained peaks

    Reference
    ----------
    C. Orphanidou, T. Bonnici, P. Charlton, D. Clifton, D. Vallance and L. Tarassenko, 
    "Signal quality indices for the electrocardiogram and photoplethysmogram: Derivation and applications to wireless monitoring", 
    IEEE J. Biomed. Health Informat., vol. 19, no. 3, pp. 832-838, May 2015.
    """
    
    try:
        ## out = ('ts', 'filtered', 'rpeaks', 'templates_ts', 'templates', 'heart_rate_ts', 'heart_rate')
        out = biosppy.signals.ecg.ecg(signal=ecg_cleaned, sampling_rate=sampling_rate, show=show)
    except Exception as e:
        return np.nan

    nni = pyhrv.tools.nn_intervals(rpeaks=out['rpeaks'])
    ## nni is in ms, convert to s
    nni = nni / 1000

    ## obtain median rr interval
    median_qrs_window = np.median(out['rpeaks'][1:] - out['rpeaks'][:-1]).astype(int)

    ## check heart rate in reasonable range of [40,180]
    if np.any(out['heart_rate'] < 40) or np.any(180 < out['heart_rate']):
        return 1.

    ## if all nni are less than 3 seconds
    if np.any(nni > 3):
        return 1.

    ## check max_rr_interval / min_rr_interval < 2.2
    if (np.max(nni) / np.min(nni)) > 2.2:
        return 1.

    templates = np.array([
        ecg_cleaned[r_peak-median_qrs_window//2:r_peak+median_qrs_window//2] 
        for r_peak in out['rpeaks']
        if (r_peak-median_qrs_window//2 >= 0) and (r_peak+median_qrs_window//2 < len(ecg_cleaned))
    ])
    
    average_template = np.mean(templates, axis=0)

    ## scipy.stats.pearsonr returns r, p_value
    corrcoefs = [
        scipy.stats.pearsonr(x=templates[i], y=average_template)[0]
        for i in range(len(templates))
        ]

    return np.mean(corrcoefs)

def averageQRS_sqi(ecg_cleaned, sampling_rate):
    """Computes a continuous index of quality of the ECG signal by interpolating the distance
    of each QRS segment from the average QRS segment present in the data. This index is relative: 
    1 corresponds to heartbeats that are the closest to the average sample and 0 corresponds to 
    the most distant heartbeat from that average sample. Note that 1 does not necessarily means
    "good": if the majority of samples are bad, than being close to the average will likely mean 
    bad as well. Use this index with care and plot it alongside your ECG signal to see if it makes sense.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The hz of the input ECG signal

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    try:
        rating = nk.ecg_quality(ecg_cleaned=ecg_cleaned, rpeaks=None, sampling_rate=sampling_rate, method="averageQRS")

        if rating == "Excellent":
            return 2
        elif rating == "Unnacceptable":
            return 0
        else:
            return 1

    except Exception as e:
        # print(e)
        return np.nan

def zhao2018_sqi(ecg_cleaned, sampling_rate):
    """Returns the zhao2018 output for ECG signal quality prediction. 
    The metod extracts several signal quality indexes (sqis):
    QRS wave power spectrum distribution psqi, kurtosis ksqi, and baseline relative power bassqi.
    An additional R peak detection match qsqi was originally computed in the paper but left out
    in this algorithm. The indices were originally weighted with a ratio of [0.4, 0.4, 0.1, 0.1] to
    generate the final classification outcome, but because qsqi was dropped,
    the weights have been rearranged to [0.6, 0.2, 0.2] for psqi, ksqi and bassqi respectively

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    try:
        rating = nk.ecg_quality(ecg_cleaned=ecg_cleaned, rpeaks=None, sampling_rate=sampling_rate, method="zhao2018", approach='fuzzy')
        if rating == "Excellent":
            return 2
        elif rating == "Unnacceptable":
            return 0
        else:
            return 1
    except Exception as e:
        # print(e)
        return np.nan


def p_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[5, 15], dem_spectrum=[5, 40]):
    """Returns a sqi based off of the Power Spectrum Distribution of QRS Waveform.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).
    window : int
        Length of each window in seconds. See `signal_psd()`.

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    try:
        psd = nk.signal_power(
            ecg_cleaned,
            sampling_rate=sampling_rate,
            frequency_band=[num_spectrum, dem_spectrum],
            method="welch",
            normalize=False,
            window=window
            )

        num_power = psd.iloc[0][0]
        dem_power = psd.iloc[0][1]

        return num_power / dem_power
    except Exception as e:
        return np.nan


def bas_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[0, 1], dem_spectrum=[0, 40]):
    """Returns a sqi that measures Relative Power in the baseline.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).
    window : int
        Length of each window in seconds. See `signal_psd()`.

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """

    try:
        psd = nk.signal_power(
            ecg_cleaned,
            sampling_rate=sampling_rate,
            frequency_band=[num_spectrum, dem_spectrum],
            method="welch",
            normalize=False,
            window=window
            )

        num_power = psd.iloc[0][0]
        dem_power = psd.iloc[0][1]

        return 1 - num_power / dem_power
    except Exception as e:
        return np.nan

def c_sqi(ecg_cleaned, sampling_rate):
    """Returns a sqi that measures variability in the R-R Interval. 
    When an artifact is present, the QRS detector underperforms by either
    missing R-peaks or erroneously identifying noisy peaks as Rpeaks. The
    above two problems will lead to a high degree of variability in the
    distribution of R-R intervals.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: https://github.com/Aura-healthcare/ecg_qc/blob/main/ecg_qc/sqi_computing/sqi_rr_intervals.py
    """
    try:
        rri_list = biosppy.signals.ecg.hamilton_segmenter(signal=ecg_cleaned, sampling_rate=sampling_rate)[0]
        c_sqi_score = np.std(rri_list, ddof=1) / np.mean(rri_list)

    except Exception:
        c_sqi_score = np.nan

    return c_sqi_score

def _get_num_matching(qrs_frames_1, qrs_frames_2, frame_tolerance):
    """Helper function to determine number of matching Rpeaks 
    """
    ## Catch complete failed QRS detection
    if (len(qrs_frames_1) == 0 or len(qrs_frames_2) == 0):
        return 0

    i = 0
    j = 0
    matching_frames = 0

    while i < len(qrs_frames_1) and j < len(qrs_frames_2):
        min_qrs_frame = min(qrs_frames_1[i], qrs_frames_2[j])
        ## Get missing detected beats intervals
        ## Matching frames
        if abs(qrs_frames_2[j] - qrs_frames_1[i]) < frame_tolerance:
            matching_frames += 1
            i += 1
            j += 1
        else:
            ## increment first QRS in frame list
            if min_qrs_frame == qrs_frames_1[i]:
                i += 1
            else:
                j += 1
    return matching_frames

def q_sqi(ecg_cleaned, sampling_rate, matching_qrs_frames_tolerance=50, method='q_sqi'):
    """Returns a sqi that measures matching Degree of R Peak Detection. 
    Two R wave detection algorithms are compared with their respective number
    of R waves detected (Hamilton vs Stationary Wavelet Transform).
    
    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: https://github.com/Aura-healthcare/ecg_qc/blob/main/ecg_qc/sqi_computing/sqi_rr_intervals.py
    """

    ## returns signals: df, info: dict of {'ECG_R_Peaks', 'sampling_rate'}
    if method=='q_sqi':
        qrs_frames_1 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='hamilton2002')[1]['ECG_R_Peaks']
        qrs_frames_2 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
        ## compute_qrs_frames_correlation
        # single_frame_duration = 1/sampling_rate
        frame_tolerance = matching_qrs_frames_tolerance * 0.001 * sampling_rate
    elif method=='b_sqi':
        qrs_frames_1 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='zong2003')[1]['ECG_R_Peaks']
        qrs_frames_2 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='pantompkins1985')[1]['ECG_R_Peaks']
        ## compute_qrs_frames_correlation (150 ms)
        frame_tolerance = .15*sampling_rate
    else:
        raise NotImplementedError

    ## Catch complete failed QRS detection

    matching_frames = _get_num_matching(qrs_frames_1, qrs_frames_2, frame_tolerance)

    if method=='q_sqi':
        if (len(qrs_frames_1) == 0 or len(qrs_frames_2) == 0): # error
            return 0.
        else:
            return 2 * matching_frames / (len(qrs_frames_1) + len(qrs_frames_2))
    elif method=='b_sqi':
        if (len(qrs_frames_1) + len(qrs_frames_2) - matching_frames) == 0: # error
            return 0.
        else:
            return matching_frames / (len(qrs_frames_1) + len(qrs_frames_2) - matching_frames)

def i_sqi(ecg_cleaned_list, sampling_rate):
    """Returns a sqi that measures inter-channel signal quality. 
    Calculated as the ratio of the number of matched beats (Nmatched) to 
    all detected beats (Nall) between a given lead and all other synchronous ECG.

    Parameters
    ----------
    ecg_cleaned_list : List of np.array
        List of cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).

    Reference
    ----------
    Li, Qiao, Roger G. Mark, and Gari D. Clifford. 
    "Robust heart rate estimation from multiple asynchronous noisy sources using signal quality indices and a Kalman filter." 
    Physiological measurement 29.1 (2007): 15.    
    """
    qrs_frames_list = [nk.ecg_peaks(ecg_cleaned, sampling_rate, method='hamilton2002')[1]['ECG_R_Peaks'] for ecg_cleaned in ecg_cleaned_list]
    frame_tolerance = .15*sampling_rate
    
    max_matching = 0
    for i in range(len(qrs_frames_list)):
        for j in range(i+1, len(qrs_frames_list)):
            matching = _get_num_matching(qrs_frames_list[i], qrs_frames_list[j], frame_tolerance)
            max_matching = max(max_matching, matching)

    return max_matching

def bs_sqi(ecg_cleaned, peaks, sampling_rate):
    """Returns a sqi for baseline wander check in time domain. The higher the wander, the lower the bs_sqi.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """

    # peaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    filtered = nk.signal_filter(signal=ecg_cleaned, sampling_rate=sampling_rate, highcut=1, method='butterworth')
    
    total = []
    
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_cleaned[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_cleaned))]
        Ra = np.max(window) - np.min(window)

        # Ba is peak to peak amplitude of baseline (1hz lowpass filter) from (R-1s, R+1s)
        window = filtered[max(int(peak-1*sampling_rate), 0) : min(int(peak+1*sampling_rate), len(ecg_cleaned))]
        Ba = np.max(window) - np.min(window)

        total.append(Ra / Ba)
    
    total = np.nanmean(total)
    return total

def e_sqi(ecg_cleaned, peaks, sampling_rate):
    """ Returns a sqi based off of the sum of energy of detected QRS waveforms over energy of entire signal.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """
    total = 0
    
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_cleaned[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_cleaned))]    
        # energy of QRS
        total += np.dot(window, window)

    return total / np.dot(ecg_cleaned, ecg_cleaned)

def hf_sqi(ecg_raw, peaks, sampling_rate):
    """ Returns a sqi based on the relative amplitude of high frequency noise.

    Parameters
    ----------
    ecg_raw : np.array
        Input unfiltered ECG signal
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """
    if len(ecg_raw) < 6: return np.nan

    ## integer coefficients high pass filter y(j) = x(j) − 2x(j − 1) + x(j − 2)
    y = ecg_raw[:-2] - 2*ecg_raw[1:-1] + ecg_raw[2:]
    s = y[:-5] + y[1:-4] + y[2:-3] + y[3:-2] + y[4:-1] + y[5:]
    
    total = []
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_raw[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_raw))]    
        # energy of QRS

        Ra = np.max(window) - np.min(window)

        window = s[max(int(peak-0.28*sampling_rate), 0) : min(int(peak-0.05*sampling_rate), len(s))]
        H = np.nanmean(window)

        total.append(Ra / H)
    return np.nanmean(total)

def rsd_sqi(ecg_cleaned, peaks, sampling_rate):
    """ Returns a sqi based on the relative standard deviation.

    Parameters
    ----------
    ecg_raw : np.array
        Input unfiltered ECG signal
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """
    
    total = []
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_cleaned[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_cleaned))]    
        # energy of QRS
        sigma_r = np.nanstd(window)

        window = ecg_cleaned[max(int(peak-0.2*sampling_rate), 0) : min(int(peak+0.2*sampling_rate), len(ecg_cleaned))]
        sigma_a = np.nanstd(window)
        
        total.append(sigma_r / (sigma_a*2))
    return np.nanmean(total)

def get_ecg_sqis(ecg_raw, ecg_cleaned, peaks, sampling_rate, window):
    """ Returns all ECG sqis.

    Parameters
    ----------
    ecg_raw : np.array
        Input unfiltered ECG signal
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency
    window : int
        Length of each window in seconds. See `signal_psd()`.

    Returns
    ----------
    List : list
        List of 11 implemented single channel ECG sqis and 10 time series sqis
        for a total of 22 features 
    """
    # ecg_cleaned = nk.ecg_clean(ecg_raw, sampling_rate=sampling_rate, method="neurokit")
    # peaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']

    ecg_sqis = [
        orphanidou2015_sqi(ecg_cleaned, sampling_rate, show=False),
        averageQRS_sqi(ecg_cleaned, sampling_rate),
        zhao2018_sqi(ecg_cleaned, sampling_rate),
        p_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[5, 15], dem_spectrum=[5, 40]),
        bas_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[0, 1], dem_spectrum=[0, 40]),
        c_sqi(ecg_cleaned, sampling_rate),
        q_sqi(ecg_cleaned, sampling_rate, matching_qrs_frames_tolerance=50),
        q_sqi(ecg_cleaned, sampling_rate, method='b_sqi'),
        bs_sqi(ecg_cleaned, peaks, sampling_rate),
        e_sqi(ecg_cleaned, peaks, sampling_rate),
        hf_sqi(ecg_raw, peaks, sampling_rate),
        rsd_sqi(ecg_cleaned, peaks, sampling_rate)
    ]

    generic_sqis = get_generic_sqis(signal=ecg_raw)
    return ecg_sqis + generic_sqis

### ===================================================== Relevant Pleth Features =====================================================

def perfusion_sqi(pleth_raw, pleth_cleaned):
    """Returns perfusion of Pleth. The perfusion index is the ratio of the pulsatile blood flow to the nonpulsatile 
    or static blood in peripheral tissue. In other words, it is the difference of the 
    amount of light absorbed through the pulse of when light is transmitted through 
    the finger. It is calculated as AC/DC * 100

    Parameters
    ----------
    pleth_raw : np.array
        Input unfiltered Pleth signal
    pleth_cleaned : np.array
        Input filtered Pleth signal
    """
    try:
        return (np.nanmax(pleth_cleaned) - np.nanmin(pleth_cleaned)) / np.nanmean(pleth_raw) * 100
    except Exception as e:
        return np.nan    

def get_pleth_sqis(pleth_raw, pleth_cleaned):
    """ Returns all Pleth sqis.

    Parameters
    ----------
    pleth_raw : np.array
        Input unfiltered Pleth signal
    pleth_cleaned : np.array
        Input filtered Pleth signal
    
    Returns
    ----------
    List : list
        List of 1 implemented single channel Pleth sqi and 10 time series sqis
        for a total of 11 features 
    """
    # pleth_cleaned = nk.ppg_clean(ppg_signal=pleth_raw, sampling_rate=sampling_rate, method='elgendi')
    pleth_sqis = [
        perfusion_sqi(pleth_raw, pleth_cleaned),
    ]

    generic_sqis = get_generic_sqis(signal=pleth_raw)
    return pleth_sqis + generic_sqis

### ===================================================== Relevant General Time Series Features =====================================================

def k_sqi(signal, kurtosis_method='fisher'):
    """Return the kurtosis of the signal, with Fisher's or Pearson's method.

    Parameters
    ----------
    signal : np.array
        The input signal
    kurtosis_method : str
        Compute kurtosis (ksqi) based on "fisher" (default) or "pearson" definition.

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    if kurtosis_method == "fisher":
        return scipy.stats.kurtosis(signal, fisher=True)
    elif kurtosis_method == "pearson":
        return scipy.stats.kurtosis(signal, fisher=False)

def s_sqi(signal):
    """Return the skewness of the signal.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return scipy.stats.skew(signal)

def pur_sqi(signal):
    """ Returns the signal purity of the input. 
    In the case of a periodic signal with a single dominant frequency, 
    it takes the value of one and approaches zero for non-sinusoidal noisy signals.
    antropy.hjorth_params returns 2 floats: (mobility, complexity).
    Complexity is the value we want.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return antropy.hjorth_params(signal)[1]

def ent_sqi(signal):
    """ Returns the sample entropy of the signal.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return antropy.sample_entropy(signal)

def pca_sqi(signal):
    """ Returns a PCA sqi of the input signals.

    Parameters 
    ----------
    signal : array-like of shape (n_samples, n_features)
        Multivariate time-series, shape is at least 2 dimensional
    """
    # todo: Currently, we are only using single channel (1d) swis
    pca = sklearn.decomposition.PCA(n_components=None)
    pca.fit(signal)

    return np.sum(pca.singular_values_[:5]) / np.sum(pca.singular_values_)

def autocorr_sqi(signal, lag):
    """Calculates the autocorrelation of the specified lag, according to the formula in https://en.wikipedia.org/wiki/Autocorrelation#Estimation

    Parameters
    ----------
    signal : np.array
        The input signal
    lag : int
        The lag to use forthe autocorrelation calculation of the signal

    Reference
    ----------    
    source: https://tsfresh.readthedocs.io/en/latest/_modules/tsfresh/feature_extraction/feature_calculators.html#autocorrelation
    """
    # This is important: If a series is passed, the product below is calculated
    # based on the index, which corresponds to squaring the series.

    if len(signal) < lag:
        return np.nan
    # Slice the relevant subseries based on the lag
    y1 = signal[: (len(signal) - lag)]
    y2 = signal[lag:]
    # Subtract the mean of the whole series x
    x_mean = np.mean(signal)
    # The result is sometimes referred to as "covariation"
    sum_product = np.sum((y1 - x_mean) * (y2 - x_mean))
    # Return the normalized unbiased covariance
    v = np.var(signal)
    if np.isclose(v, 0):
        return np.nan
    else:
        return sum_product / ((len(signal) - lag) * v)

def zc_sqi(signal):
    """Returns the zero crossing rate.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return antropy.num_zerocross(signal)

def snr_sqi(signal_raw, signal_cleaned):
    """Returns the signal to noise ratio (SNR). There are many ways to define SNR, here, we use std of filtered vs std of raw signal.

    Parameters
    ----------
    signal_raw : np.array
        Raw input signal
    signal_cleaned : np.array    
        Cleaned input signal
    """
    return np.std(np.abs(signal_cleaned)) / np.std(np.abs(signal_raw))

def f_sqi(signal, window_size=3, threshold=1e-7):
    """Returns an sqi that computes percentage of flatness in the signal. 
    Constant values over a longer period (flat line) may be caused by sensor failures.

    Parameters
    ----------
    signal : np.array
        The input signal
    window_size : int
        Window to detect flat line, larger values will lower detection sensitivity
    threshold : float
        Threshold of flatness. I.e. Where (max-min) is considered equivalent  

    Reference
    ----------
    Source: https://github.com/DHI/tsod/blob/main/tsod/detectors.py
    """
    if window_size >= len(signal): return 0

    rolling = np.lib.stride_tricks.sliding_window_view(signal, window_shape=window_size)
    rollmax = np.nanmax(rolling, axis=1)
    rollmin = np.nanmin(rolling, axis=1)
    
    anomalies = rollmax - rollmin < threshold
    anomalies[0] = False  # first element cannot be determined
    anomalies[-1] = False

    idx = np.where(anomalies)[0]
    if idx is not None:
        # assuming window size = 3
        # remove also points before and after each detected anomaly
        anomalies[idx[idx > 0] - 1] = True
        maxidx = len(anomalies) - 1
        anomalies[idx[idx < maxidx] + 1] = True

    return np.sum(anomalies) / len(anomalies)

def get_generic_sqis(signal):
    """ Returns sqis for a generic signal.

    Parameters
    ----------
    signal : np.array
        The input signal

    Returns
    ----------
    List : list
        List of 10 time series sqis
    """
    return [
        k_sqi(signal, kurtosis_method='fisher'),
        s_sqi(signal),
        pur_sqi(signal),
        ent_sqi(signal),
        zc_sqi(signal),
        f_sqi(signal, window_size=3, threshold=1e-7),
        np.nanmean(signal),
        np.nanstd(signal),
        np.nanmax(signal),
        np.nanmin(signal)
    ]

Functions

def autocorr_sqi(signal, lag)

Calculates the autocorrelation of the specified lag, according to the formula in https://en.wikipedia.org/wiki/Autocorrelation#Estimation

Parameters

signal : np.array
The input signal
lag : int
The lag to use forthe autocorrelation calculation of the signal

Reference

source: https://tsfresh.readthedocs.io/en/latest/_modules/tsfresh/feature_extraction/feature_calculators.html#autocorrelation

Expand source code
def autocorr_sqi(signal, lag):
    """Calculates the autocorrelation of the specified lag, according to the formula in https://en.wikipedia.org/wiki/Autocorrelation#Estimation

    Parameters
    ----------
    signal : np.array
        The input signal
    lag : int
        The lag to use forthe autocorrelation calculation of the signal

    Reference
    ----------    
    source: https://tsfresh.readthedocs.io/en/latest/_modules/tsfresh/feature_extraction/feature_calculators.html#autocorrelation
    """
    # This is important: If a series is passed, the product below is calculated
    # based on the index, which corresponds to squaring the series.

    if len(signal) < lag:
        return np.nan
    # Slice the relevant subseries based on the lag
    y1 = signal[: (len(signal) - lag)]
    y2 = signal[lag:]
    # Subtract the mean of the whole series x
    x_mean = np.mean(signal)
    # The result is sometimes referred to as "covariation"
    sum_product = np.sum((y1 - x_mean) * (y2 - x_mean))
    # Return the normalized unbiased covariance
    v = np.var(signal)
    if np.isclose(v, 0):
        return np.nan
    else:
        return sum_product / ((len(signal) - lag) * v)
def averageQRS_sqi(ecg_cleaned, sampling_rate)

Computes a continuous index of quality of the ECG signal by interpolating the distance of each QRS segment from the average QRS segment present in the data. This index is relative: 1 corresponds to heartbeats that are the closest to the average sample and 0 corresponds to the most distant heartbeat from that average sample. Note that 1 does not necessarily means "good": if the majority of samples are bad, than being close to the average will likely mean bad as well. Use this index with care and plot it alongside your ECG signal to see if it makes sense.

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_rate : int
The hz of the input ECG signal

Reference

Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py

Expand source code
def averageQRS_sqi(ecg_cleaned, sampling_rate):
    """Computes a continuous index of quality of the ECG signal by interpolating the distance
    of each QRS segment from the average QRS segment present in the data. This index is relative: 
    1 corresponds to heartbeats that are the closest to the average sample and 0 corresponds to 
    the most distant heartbeat from that average sample. Note that 1 does not necessarily means
    "good": if the majority of samples are bad, than being close to the average will likely mean 
    bad as well. Use this index with care and plot it alongside your ECG signal to see if it makes sense.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The hz of the input ECG signal

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    try:
        rating = nk.ecg_quality(ecg_cleaned=ecg_cleaned, rpeaks=None, sampling_rate=sampling_rate, method="averageQRS")

        if rating == "Excellent":
            return 2
        elif rating == "Unnacceptable":
            return 0
        else:
            return 1

    except Exception as e:
        # print(e)
        return np.nan
def bas_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[0, 1], dem_spectrum=[0, 40])

Returns a sqi that measures Relative Power in the baseline.

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_rate : int
The sampling frequency of the signal (in Hz, i.e., samples/second).
window : int
Length of each window in seconds. See signal_psd().

Reference

Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py

Expand source code
def bas_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[0, 1], dem_spectrum=[0, 40]):
    """Returns a sqi that measures Relative Power in the baseline.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).
    window : int
        Length of each window in seconds. See `signal_psd()`.

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """

    try:
        psd = nk.signal_power(
            ecg_cleaned,
            sampling_rate=sampling_rate,
            frequency_band=[num_spectrum, dem_spectrum],
            method="welch",
            normalize=False,
            window=window
            )

        num_power = psd.iloc[0][0]
        dem_power = psd.iloc[0][1]

        return 1 - num_power / dem_power
    except Exception as e:
        return np.nan
def bs_sqi(ecg_cleaned, peaks, sampling_rate)

Returns a sqi for baseline wander check in time domain. The higher the wander, the lower the bs_sqi.

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
peaks : list
List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
sampling_frequency : int
Input ecg sampling frequency

Reference

Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. "A machine learning approach to multi-level ECG signal quality classification." Computer methods and programs in biomedicine 117.3 (2014): 435-447.

Expand source code
def bs_sqi(ecg_cleaned, peaks, sampling_rate):
    """Returns a sqi for baseline wander check in time domain. The higher the wander, the lower the bs_sqi.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """

    # peaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    filtered = nk.signal_filter(signal=ecg_cleaned, sampling_rate=sampling_rate, highcut=1, method='butterworth')
    
    total = []
    
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_cleaned[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_cleaned))]
        Ra = np.max(window) - np.min(window)

        # Ba is peak to peak amplitude of baseline (1hz lowpass filter) from (R-1s, R+1s)
        window = filtered[max(int(peak-1*sampling_rate), 0) : min(int(peak+1*sampling_rate), len(ecg_cleaned))]
        Ba = np.max(window) - np.min(window)

        total.append(Ra / Ba)
    
    total = np.nanmean(total)
    return total
def c_sqi(ecg_cleaned, sampling_rate)

Returns a sqi that measures variability in the R-R Interval. When an artifact is present, the QRS detector underperforms by either missing R-peaks or erroneously identifying noisy peaks as Rpeaks. The above two problems will lead to a high degree of variability in the distribution of R-R intervals.

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_frequency : int
Input ecg sampling frequency

Reference

Source: https://github.com/Aura-healthcare/ecg_qc/blob/main/ecg_qc/sqi_computing/sqi_rr_intervals.py

Expand source code
def c_sqi(ecg_cleaned, sampling_rate):
    """Returns a sqi that measures variability in the R-R Interval. 
    When an artifact is present, the QRS detector underperforms by either
    missing R-peaks or erroneously identifying noisy peaks as Rpeaks. The
    above two problems will lead to a high degree of variability in the
    distribution of R-R intervals.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: https://github.com/Aura-healthcare/ecg_qc/blob/main/ecg_qc/sqi_computing/sqi_rr_intervals.py
    """
    try:
        rri_list = biosppy.signals.ecg.hamilton_segmenter(signal=ecg_cleaned, sampling_rate=sampling_rate)[0]
        c_sqi_score = np.std(rri_list, ddof=1) / np.mean(rri_list)

    except Exception:
        c_sqi_score = np.nan

    return c_sqi_score
def e_sqi(ecg_cleaned, peaks, sampling_rate)

Returns a sqi based off of the sum of energy of detected QRS waveforms over energy of entire signal.

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
peaks : list
List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
sampling_frequency : int
Input ecg sampling frequency

Reference

Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. "A machine learning approach to multi-level ECG signal quality classification." Computer methods and programs in biomedicine 117.3 (2014): 435-447.

Expand source code
def e_sqi(ecg_cleaned, peaks, sampling_rate):
    """ Returns a sqi based off of the sum of energy of detected QRS waveforms over energy of entire signal.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """
    total = 0
    
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_cleaned[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_cleaned))]    
        # energy of QRS
        total += np.dot(window, window)

    return total / np.dot(ecg_cleaned, ecg_cleaned)
def ent_sqi(signal)

Returns the sample entropy of the signal.

Parameters

signal : np.array
The input signal
Expand source code
def ent_sqi(signal):
    """ Returns the sample entropy of the signal.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return antropy.sample_entropy(signal)
def f_sqi(signal, window_size=3, threshold=1e-07)

Returns an sqi that computes percentage of flatness in the signal. Constant values over a longer period (flat line) may be caused by sensor failures.

Parameters

signal : np.array
The input signal
window_size : int
Window to detect flat line, larger values will lower detection sensitivity
threshold : float
Threshold of flatness. I.e. Where (max-min) is considered equivalent

Reference

Source: https://github.com/DHI/tsod/blob/main/tsod/detectors.py

Expand source code
def f_sqi(signal, window_size=3, threshold=1e-7):
    """Returns an sqi that computes percentage of flatness in the signal. 
    Constant values over a longer period (flat line) may be caused by sensor failures.

    Parameters
    ----------
    signal : np.array
        The input signal
    window_size : int
        Window to detect flat line, larger values will lower detection sensitivity
    threshold : float
        Threshold of flatness. I.e. Where (max-min) is considered equivalent  

    Reference
    ----------
    Source: https://github.com/DHI/tsod/blob/main/tsod/detectors.py
    """
    if window_size >= len(signal): return 0

    rolling = np.lib.stride_tricks.sliding_window_view(signal, window_shape=window_size)
    rollmax = np.nanmax(rolling, axis=1)
    rollmin = np.nanmin(rolling, axis=1)
    
    anomalies = rollmax - rollmin < threshold
    anomalies[0] = False  # first element cannot be determined
    anomalies[-1] = False

    idx = np.where(anomalies)[0]
    if idx is not None:
        # assuming window size = 3
        # remove also points before and after each detected anomaly
        anomalies[idx[idx > 0] - 1] = True
        maxidx = len(anomalies) - 1
        anomalies[idx[idx < maxidx] + 1] = True

    return np.sum(anomalies) / len(anomalies)
def get_ecg_sqis(ecg_raw, ecg_cleaned, peaks, sampling_rate, window)

Returns all ECG sqis.

Parameters

ecg_raw : np.array
Input unfiltered ECG signal
ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
peaks : list
List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
sampling_frequency : int
Input ecg sampling frequency
window : int
Length of each window in seconds. See signal_psd().

Returns

List : list
List of 11 implemented single channel ECG sqis and 10 time series sqis for a total of 22 features
Expand source code
def get_ecg_sqis(ecg_raw, ecg_cleaned, peaks, sampling_rate, window):
    """ Returns all ECG sqis.

    Parameters
    ----------
    ecg_raw : np.array
        Input unfiltered ECG signal
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency
    window : int
        Length of each window in seconds. See `signal_psd()`.

    Returns
    ----------
    List : list
        List of 11 implemented single channel ECG sqis and 10 time series sqis
        for a total of 22 features 
    """
    # ecg_cleaned = nk.ecg_clean(ecg_raw, sampling_rate=sampling_rate, method="neurokit")
    # peaks = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']

    ecg_sqis = [
        orphanidou2015_sqi(ecg_cleaned, sampling_rate, show=False),
        averageQRS_sqi(ecg_cleaned, sampling_rate),
        zhao2018_sqi(ecg_cleaned, sampling_rate),
        p_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[5, 15], dem_spectrum=[5, 40]),
        bas_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[0, 1], dem_spectrum=[0, 40]),
        c_sqi(ecg_cleaned, sampling_rate),
        q_sqi(ecg_cleaned, sampling_rate, matching_qrs_frames_tolerance=50),
        q_sqi(ecg_cleaned, sampling_rate, method='b_sqi'),
        bs_sqi(ecg_cleaned, peaks, sampling_rate),
        e_sqi(ecg_cleaned, peaks, sampling_rate),
        hf_sqi(ecg_raw, peaks, sampling_rate),
        rsd_sqi(ecg_cleaned, peaks, sampling_rate)
    ]

    generic_sqis = get_generic_sqis(signal=ecg_raw)
    return ecg_sqis + generic_sqis
def get_generic_sqis(signal)

Returns sqis for a generic signal.

Parameters

signal : np.array
The input signal

Returns

List : list
List of 10 time series sqis
Expand source code
def get_generic_sqis(signal):
    """ Returns sqis for a generic signal.

    Parameters
    ----------
    signal : np.array
        The input signal

    Returns
    ----------
    List : list
        List of 10 time series sqis
    """
    return [
        k_sqi(signal, kurtosis_method='fisher'),
        s_sqi(signal),
        pur_sqi(signal),
        ent_sqi(signal),
        zc_sqi(signal),
        f_sqi(signal, window_size=3, threshold=1e-7),
        np.nanmean(signal),
        np.nanstd(signal),
        np.nanmax(signal),
        np.nanmin(signal)
    ]
def get_pleth_sqis(pleth_raw, pleth_cleaned)

Returns all Pleth sqis.

Parameters

pleth_raw : np.array
Input unfiltered Pleth signal
pleth_cleaned : np.array
Input filtered Pleth signal

Returns

List : list
List of 1 implemented single channel Pleth sqi and 10 time series sqis for a total of 11 features
Expand source code
def get_pleth_sqis(pleth_raw, pleth_cleaned):
    """ Returns all Pleth sqis.

    Parameters
    ----------
    pleth_raw : np.array
        Input unfiltered Pleth signal
    pleth_cleaned : np.array
        Input filtered Pleth signal
    
    Returns
    ----------
    List : list
        List of 1 implemented single channel Pleth sqi and 10 time series sqis
        for a total of 11 features 
    """
    # pleth_cleaned = nk.ppg_clean(ppg_signal=pleth_raw, sampling_rate=sampling_rate, method='elgendi')
    pleth_sqis = [
        perfusion_sqi(pleth_raw, pleth_cleaned),
    ]

    generic_sqis = get_generic_sqis(signal=pleth_raw)
    return pleth_sqis + generic_sqis
def hf_sqi(ecg_raw, peaks, sampling_rate)

Returns a sqi based on the relative amplitude of high frequency noise.

Parameters

ecg_raw : np.array
Input unfiltered ECG signal
peaks : list
List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
sampling_frequency : int
Input ecg sampling frequency

Reference

Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. "A machine learning approach to multi-level ECG signal quality classification." Computer methods and programs in biomedicine 117.3 (2014): 435-447.

Expand source code
def hf_sqi(ecg_raw, peaks, sampling_rate):
    """ Returns a sqi based on the relative amplitude of high frequency noise.

    Parameters
    ----------
    ecg_raw : np.array
        Input unfiltered ECG signal
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """
    if len(ecg_raw) < 6: return np.nan

    ## integer coefficients high pass filter y(j) = x(j) − 2x(j − 1) + x(j − 2)
    y = ecg_raw[:-2] - 2*ecg_raw[1:-1] + ecg_raw[2:]
    s = y[:-5] + y[1:-4] + y[2:-3] + y[3:-2] + y[4:-1] + y[5:]
    
    total = []
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_raw[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_raw))]    
        # energy of QRS

        Ra = np.max(window) - np.min(window)

        window = s[max(int(peak-0.28*sampling_rate), 0) : min(int(peak-0.05*sampling_rate), len(s))]
        H = np.nanmean(window)

        total.append(Ra / H)
    return np.nanmean(total)
def i_sqi(ecg_cleaned_list, sampling_rate)

Returns a sqi that measures inter-channel signal quality. Calculated as the ratio of the number of matched beats (Nmatched) to all detected beats (Nall) between a given lead and all other synchronous ECG.

Parameters

ecg_cleaned_list : List of np.array
List of cleaned ECG signal in the form of a vector of values.
sampling_rate : int
The sampling frequency of the signal (in Hz, i.e., samples/second).

Reference

Li, Qiao, Roger G. Mark, and Gari D. Clifford. "Robust heart rate estimation from multiple asynchronous noisy sources using signal quality indices and a Kalman filter." Physiological measurement 29.1 (2007): 15.

Expand source code
def i_sqi(ecg_cleaned_list, sampling_rate):
    """Returns a sqi that measures inter-channel signal quality. 
    Calculated as the ratio of the number of matched beats (Nmatched) to 
    all detected beats (Nall) between a given lead and all other synchronous ECG.

    Parameters
    ----------
    ecg_cleaned_list : List of np.array
        List of cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).

    Reference
    ----------
    Li, Qiao, Roger G. Mark, and Gari D. Clifford. 
    "Robust heart rate estimation from multiple asynchronous noisy sources using signal quality indices and a Kalman filter." 
    Physiological measurement 29.1 (2007): 15.    
    """
    qrs_frames_list = [nk.ecg_peaks(ecg_cleaned, sampling_rate, method='hamilton2002')[1]['ECG_R_Peaks'] for ecg_cleaned in ecg_cleaned_list]
    frame_tolerance = .15*sampling_rate
    
    max_matching = 0
    for i in range(len(qrs_frames_list)):
        for j in range(i+1, len(qrs_frames_list)):
            matching = _get_num_matching(qrs_frames_list[i], qrs_frames_list[j], frame_tolerance)
            max_matching = max(max_matching, matching)

    return max_matching
def k_sqi(signal, kurtosis_method='fisher')

Return the kurtosis of the signal, with Fisher's or Pearson's method.

Parameters

signal : np.array
The input signal
kurtosis_method : str
Compute kurtosis (ksqi) based on "fisher" (default) or "pearson" definition.

Reference

Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py

Expand source code
def k_sqi(signal, kurtosis_method='fisher'):
    """Return the kurtosis of the signal, with Fisher's or Pearson's method.

    Parameters
    ----------
    signal : np.array
        The input signal
    kurtosis_method : str
        Compute kurtosis (ksqi) based on "fisher" (default) or "pearson" definition.

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    if kurtosis_method == "fisher":
        return scipy.stats.kurtosis(signal, fisher=True)
    elif kurtosis_method == "pearson":
        return scipy.stats.kurtosis(signal, fisher=False)
def orphanidou2015_sqi(ecg_cleaned, sampling_rate, show=False)

Implementation of template matching approach introduced by Orphanidou et al. Returns the average correlation coefficient (scipy.stats.pearsonr) of the QRS waveforms that ranges from -1 to 1

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_rate : int
The hz of the input ECG signal
show : bool
Flag whether to show the obtained peaks

Reference

C. Orphanidou, T. Bonnici, P. Charlton, D. Clifton, D. Vallance and L. Tarassenko, "Signal quality indices for the electrocardiogram and photoplethysmogram: Derivation and applications to wireless monitoring", IEEE J. Biomed. Health Informat., vol. 19, no. 3, pp. 832-838, May 2015.

Expand source code
def orphanidou2015_sqi(ecg_cleaned, sampling_rate, show=False):
    """ Implementation of template matching approach introduced by Orphanidou et al. 
    Returns the average correlation coefficient (scipy.stats.pearsonr) of the QRS waveforms that ranges from -1 to 1

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The hz of the input ECG signal
    show : bool
         Flag whether to show the obtained peaks

    Reference
    ----------
    C. Orphanidou, T. Bonnici, P. Charlton, D. Clifton, D. Vallance and L. Tarassenko, 
    "Signal quality indices for the electrocardiogram and photoplethysmogram: Derivation and applications to wireless monitoring", 
    IEEE J. Biomed. Health Informat., vol. 19, no. 3, pp. 832-838, May 2015.
    """
    
    try:
        ## out = ('ts', 'filtered', 'rpeaks', 'templates_ts', 'templates', 'heart_rate_ts', 'heart_rate')
        out = biosppy.signals.ecg.ecg(signal=ecg_cleaned, sampling_rate=sampling_rate, show=show)
    except Exception as e:
        return np.nan

    nni = pyhrv.tools.nn_intervals(rpeaks=out['rpeaks'])
    ## nni is in ms, convert to s
    nni = nni / 1000

    ## obtain median rr interval
    median_qrs_window = np.median(out['rpeaks'][1:] - out['rpeaks'][:-1]).astype(int)

    ## check heart rate in reasonable range of [40,180]
    if np.any(out['heart_rate'] < 40) or np.any(180 < out['heart_rate']):
        return 1.

    ## if all nni are less than 3 seconds
    if np.any(nni > 3):
        return 1.

    ## check max_rr_interval / min_rr_interval < 2.2
    if (np.max(nni) / np.min(nni)) > 2.2:
        return 1.

    templates = np.array([
        ecg_cleaned[r_peak-median_qrs_window//2:r_peak+median_qrs_window//2] 
        for r_peak in out['rpeaks']
        if (r_peak-median_qrs_window//2 >= 0) and (r_peak+median_qrs_window//2 < len(ecg_cleaned))
    ])
    
    average_template = np.mean(templates, axis=0)

    ## scipy.stats.pearsonr returns r, p_value
    corrcoefs = [
        scipy.stats.pearsonr(x=templates[i], y=average_template)[0]
        for i in range(len(templates))
        ]

    return np.mean(corrcoefs)
def p_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[5, 15], dem_spectrum=[5, 40])

Returns a sqi based off of the Power Spectrum Distribution of QRS Waveform.

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_rate : int
The sampling frequency of the signal (in Hz, i.e., samples/second).
window : int
Length of each window in seconds. See signal_psd().

Reference

Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py

Expand source code
def p_sqi(ecg_cleaned, sampling_rate, window, num_spectrum=[5, 15], dem_spectrum=[5, 40]):
    """Returns a sqi based off of the Power Spectrum Distribution of QRS Waveform.

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).
    window : int
        Length of each window in seconds. See `signal_psd()`.

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    try:
        psd = nk.signal_power(
            ecg_cleaned,
            sampling_rate=sampling_rate,
            frequency_band=[num_spectrum, dem_spectrum],
            method="welch",
            normalize=False,
            window=window
            )

        num_power = psd.iloc[0][0]
        dem_power = psd.iloc[0][1]

        return num_power / dem_power
    except Exception as e:
        return np.nan
def pca_sqi(signal)

Returns a PCA sqi of the input signals.

Parameters

signal : array-like of shape (n_samples, n_features) Multivariate time-series, shape is at least 2 dimensional

Expand source code
def pca_sqi(signal):
    """ Returns a PCA sqi of the input signals.

    Parameters 
    ----------
    signal : array-like of shape (n_samples, n_features)
        Multivariate time-series, shape is at least 2 dimensional
    """
    # todo: Currently, we are only using single channel (1d) swis
    pca = sklearn.decomposition.PCA(n_components=None)
    pca.fit(signal)

    return np.sum(pca.singular_values_[:5]) / np.sum(pca.singular_values_)
def perfusion_sqi(pleth_raw, pleth_cleaned)

Returns perfusion of Pleth. The perfusion index is the ratio of the pulsatile blood flow to the nonpulsatile or static blood in peripheral tissue. In other words, it is the difference of the amount of light absorbed through the pulse of when light is transmitted through the finger. It is calculated as AC/DC * 100

Parameters

pleth_raw : np.array
Input unfiltered Pleth signal
pleth_cleaned : np.array
Input filtered Pleth signal
Expand source code
def perfusion_sqi(pleth_raw, pleth_cleaned):
    """Returns perfusion of Pleth. The perfusion index is the ratio of the pulsatile blood flow to the nonpulsatile 
    or static blood in peripheral tissue. In other words, it is the difference of the 
    amount of light absorbed through the pulse of when light is transmitted through 
    the finger. It is calculated as AC/DC * 100

    Parameters
    ----------
    pleth_raw : np.array
        Input unfiltered Pleth signal
    pleth_cleaned : np.array
        Input filtered Pleth signal
    """
    try:
        return (np.nanmax(pleth_cleaned) - np.nanmin(pleth_cleaned)) / np.nanmean(pleth_raw) * 100
    except Exception as e:
        return np.nan    
def pur_sqi(signal)

Returns the signal purity of the input. In the case of a periodic signal with a single dominant frequency, it takes the value of one and approaches zero for non-sinusoidal noisy signals. antropy.hjorth_params returns 2 floats: (mobility, complexity). Complexity is the value we want.

Parameters

signal : np.array
The input signal
Expand source code
def pur_sqi(signal):
    """ Returns the signal purity of the input. 
    In the case of a periodic signal with a single dominant frequency, 
    it takes the value of one and approaches zero for non-sinusoidal noisy signals.
    antropy.hjorth_params returns 2 floats: (mobility, complexity).
    Complexity is the value we want.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return antropy.hjorth_params(signal)[1]
def q_sqi(ecg_cleaned, sampling_rate, matching_qrs_frames_tolerance=50, method='q_sqi')

Returns a sqi that measures matching Degree of R Peak Detection. Two R wave detection algorithms are compared with their respective number of R waves detected (Hamilton vs Stationary Wavelet Transform).

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_frequency : int
Input ecg sampling frequency

Reference

Source: https://github.com/Aura-healthcare/ecg_qc/blob/main/ecg_qc/sqi_computing/sqi_rr_intervals.py

Expand source code
def q_sqi(ecg_cleaned, sampling_rate, matching_qrs_frames_tolerance=50, method='q_sqi'):
    """Returns a sqi that measures matching Degree of R Peak Detection. 
    Two R wave detection algorithms are compared with their respective number
    of R waves detected (Hamilton vs Stationary Wavelet Transform).
    
    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: https://github.com/Aura-healthcare/ecg_qc/blob/main/ecg_qc/sqi_computing/sqi_rr_intervals.py
    """

    ## returns signals: df, info: dict of {'ECG_R_Peaks', 'sampling_rate'}
    if method=='q_sqi':
        qrs_frames_1 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='hamilton2002')[1]['ECG_R_Peaks']
        qrs_frames_2 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
        ## compute_qrs_frames_correlation
        # single_frame_duration = 1/sampling_rate
        frame_tolerance = matching_qrs_frames_tolerance * 0.001 * sampling_rate
    elif method=='b_sqi':
        qrs_frames_1 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='zong2003')[1]['ECG_R_Peaks']
        qrs_frames_2 = nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='pantompkins1985')[1]['ECG_R_Peaks']
        ## compute_qrs_frames_correlation (150 ms)
        frame_tolerance = .15*sampling_rate
    else:
        raise NotImplementedError

    ## Catch complete failed QRS detection

    matching_frames = _get_num_matching(qrs_frames_1, qrs_frames_2, frame_tolerance)

    if method=='q_sqi':
        if (len(qrs_frames_1) == 0 or len(qrs_frames_2) == 0): # error
            return 0.
        else:
            return 2 * matching_frames / (len(qrs_frames_1) + len(qrs_frames_2))
    elif method=='b_sqi':
        if (len(qrs_frames_1) + len(qrs_frames_2) - matching_frames) == 0: # error
            return 0.
        else:
            return matching_frames / (len(qrs_frames_1) + len(qrs_frames_2) - matching_frames)
def rsd_sqi(ecg_cleaned, peaks, sampling_rate)

Returns a sqi based on the relative standard deviation.

Parameters

ecg_raw : np.array
Input unfiltered ECG signal
peaks : list
List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
sampling_frequency : int
Input ecg sampling frequency

Reference

Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. "A machine learning approach to multi-level ECG signal quality classification." Computer methods and programs in biomedicine 117.3 (2014): 435-447.

Expand source code
def rsd_sqi(ecg_cleaned, peaks, sampling_rate):
    """ Returns a sqi based on the relative standard deviation.

    Parameters
    ----------
    ecg_raw : np.array
        Input unfiltered ECG signal
    peaks : list 
        List of rpeak locations like in nk.ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, method='kalidas2017')[1]['ECG_R_Peaks']
    sampling_frequency : int
        Input ecg sampling frequency

    Reference
    ----------
    Source: Li, Qiao, Cadathur Rajagopalan, and Gari D. Clifford. 
    "A machine learning approach to multi-level ECG signal quality classification." 
    Computer methods and programs in biomedicine 117.3 (2014): 435-447. 
    """
    
    total = []
    for peak in peaks:
        # Ra is peak to peak amplitude of ECG signal from (R-0.07s, R+0.08s)
        window = ecg_cleaned[max(int(peak-0.07*sampling_rate), 0) : min(int(peak+0.08*sampling_rate), len(ecg_cleaned))]    
        # energy of QRS
        sigma_r = np.nanstd(window)

        window = ecg_cleaned[max(int(peak-0.2*sampling_rate), 0) : min(int(peak+0.2*sampling_rate), len(ecg_cleaned))]
        sigma_a = np.nanstd(window)
        
        total.append(sigma_r / (sigma_a*2))
    return np.nanmean(total)
def s_sqi(signal)

Return the skewness of the signal.

Parameters

signal : np.array
The input signal
Expand source code
def s_sqi(signal):
    """Return the skewness of the signal.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return scipy.stats.skew(signal)
def snr_sqi(signal_raw, signal_cleaned)

Returns the signal to noise ratio (SNR). There are many ways to define SNR, here, we use std of filtered vs std of raw signal.

Parameters

signal_raw : np.array
Raw input signal
signal_cleaned : np.array
Cleaned input signal
Expand source code
def snr_sqi(signal_raw, signal_cleaned):
    """Returns the signal to noise ratio (SNR). There are many ways to define SNR, here, we use std of filtered vs std of raw signal.

    Parameters
    ----------
    signal_raw : np.array
        Raw input signal
    signal_cleaned : np.array    
        Cleaned input signal
    """
    return np.std(np.abs(signal_cleaned)) / np.std(np.abs(signal_raw))
def zc_sqi(signal)

Returns the zero crossing rate.

Parameters

signal : np.array
The input signal
Expand source code
def zc_sqi(signal):
    """Returns the zero crossing rate.

    Parameters
    ----------
    signal : np.array
        The input signal
    """
    return antropy.num_zerocross(signal)
def zhao2018_sqi(ecg_cleaned, sampling_rate)

Returns the zhao2018 output for ECG signal quality prediction. The metod extracts several signal quality indexes (sqis): QRS wave power spectrum distribution psqi, kurtosis ksqi, and baseline relative power bassqi. An additional R peak detection match qsqi was originally computed in the paper but left out in this algorithm. The indices were originally weighted with a ratio of [0.4, 0.4, 0.1, 0.1] to generate the final classification outcome, but because qsqi was dropped, the weights have been rearranged to [0.6, 0.2, 0.2] for psqi, ksqi and bassqi respectively

Parameters

ecg_cleaned : np.array
The cleaned ECG signal in the form of a vector of values.
sampling_rate : int
The sampling frequency of the signal (in Hz, i.e., samples/second).

Reference

Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py

Expand source code
def zhao2018_sqi(ecg_cleaned, sampling_rate):
    """Returns the zhao2018 output for ECG signal quality prediction. 
    The metod extracts several signal quality indexes (sqis):
    QRS wave power spectrum distribution psqi, kurtosis ksqi, and baseline relative power bassqi.
    An additional R peak detection match qsqi was originally computed in the paper but left out
    in this algorithm. The indices were originally weighted with a ratio of [0.4, 0.4, 0.1, 0.1] to
    generate the final classification outcome, but because qsqi was dropped,
    the weights have been rearranged to [0.6, 0.2, 0.2] for psqi, ksqi and bassqi respectively

    Parameters
    ----------
    ecg_cleaned : np.array
        The cleaned ECG signal in the form of a vector of values.
    sampling_rate : int
        The sampling frequency of the signal (in Hz, i.e., samples/second).

    Reference
    ----------
    Source: https://github.com/neuropsychology/NeuroKit/blob/master/neurokit2/ecg/ecg_quality.py
    """
    try:
        rating = nk.ecg_quality(ecg_cleaned=ecg_cleaned, rpeaks=None, sampling_rate=sampling_rate, method="zhao2018", approach='fuzzy')
        if rating == "Excellent":
            return 2
        elif rating == "Unnacceptable":
            return 0
        else:
            return 1
    except Exception as e:
        # print(e)
        return np.nan