Source code for pyPCG.preprocessing

from pyPCG import pcg_signal
import scipy.signal as signal
import numpy as np
import pywt
import copy
import emd
from typing import Callable, TypedDict

[docs] def envelope(sig: pcg_signal) -> pcg_signal: """Calculates the envelope of the signal based on Hilbert transformation Args: sig (pcg_signal): input signal Returns: pcg_signal: envelope """ ret_sig = copy.deepcopy(sig) ret_sig.processing_log.append("Envelope") ret_sig.data = np.abs(signal.hilbert(ret_sig.data)) # type: ignore return ret_sig
[docs] def homomorphic(sig: pcg_signal, filt_ord: int = 6, filt_cutfreq: float = 8) -> pcg_signal: """Calculate the homomoprphic envelope of the signal Args: sig (pcg_signal): input signal filt_ord (int, optional): lowpass filter order. Defaults to 6. filt_cutfreq (float, optional): lowpass filter cutoff frequency. Defaults to 8. Raises: ValueError: The cutoff frequency exceeds the Nyquist limit Returns: pcg_signal: homomoprhic envelope """ if filt_cutfreq>sig.fs/2: raise ValueError("Filter cut frequency exceeds Nyquist limit") ret_sig = copy.deepcopy(sig) env = envelope(sig).data ret_sig.processing_log.append(f"Homomorphic envelope (order-{filt_ord},cut-{filt_cutfreq})") lp = signal.butter(filt_ord,filt_cutfreq,output='sos',fs=sig.fs,btype='lowpass') env[env<=0] = np.finfo(float).eps filt = signal.sosfiltfilt(lp,np.log(env)) ret_sig.data = np.exp(filt) return ret_sig
[docs] def filter(sig: pcg_signal, filt_ord: int, filt_cutfreq: float, filt_type: str = "LP") -> pcg_signal: """Filters the signal based on the input parameters with a Butterworth filter design Args: sig (pcg_signal): input signal filt_ord (int): filter order filt_cutfreq (float): filter cutoff frequency filt_type (str, optional): filter type: `"LP"` (low-pass) or `"HP"` (high-pass). Defaults to `"LP"`. Raises: NotImplementedError: Other filter type ValueError: Filter cutoff exceeds Nyquist limit Returns: pcg_signal: filtered signal """ longname = "" if filt_type == "LP": longname = "lowpass" elif filt_type == "HP": longname = "highpass" else: raise NotImplementedError("Only HP and LP filters are supported right now") if filt_cutfreq>sig.fs/2: raise ValueError("Filter cut frequency exceeds Nyquist limit") filt = signal.butter(filt_ord,filt_cutfreq,output='sos',fs=sig.fs,btype=longname) ret_sig = copy.deepcopy(sig) ret_sig.data = signal.sosfiltfilt(filt,ret_sig.data) ret_sig.processing_log.append(f"{filt_type} Filter (order-{filt_ord}, cut-{filt_cutfreq})") return ret_sig
[docs] def wt_denoise(sig: pcg_signal, th: float=0.2, wt_family: str = "coif4", wt_level: int = 5) -> pcg_signal: """Denoise the signal with a wavelet thresholding method Args: sig (pcg_signal): input noisy signal th (float, optional): threshold value given as a percentage of maximum. Defaults to 0.2. wt_family (str, optional): wavelet family. Defaults to "coif4". wt_level (int, optional): wavelet decomposition level. Defaults to 5. Returns: pcg_signal: denoised signal """ ret_sig = copy.deepcopy(sig) th_coeffs = [] coeffs = pywt.wavedec(ret_sig.data,wt_family,level=wt_level) for coeff in coeffs: th_coeffs.append(pywt.threshold(coeff,th*max(coeff))) ret_sig.data = pywt.waverec(th_coeffs,wt_family) ret_sig.processing_log.append(f"Wavelet denoise (family-{wt_family}, level-{wt_level}, th-{th})") return ret_sig
[docs] def slice_signal(sig: pcg_signal, time_len: float=60, overlap: float=0) -> list[pcg_signal]: """Slice long signal into a list of shorter signals Args: sig (pcg_signal): input long signal time_len (float, optional): desired short timelength [seconds]. Defaults to 60. overlap (float, optional): overlap percentage. Defaults to 0. Returns: list[pcg_signal]: list of shorter signals """ time_len_s = round(time_len*sig.fs) step = time_len_s-round(time_len_s*overlap) start = 0 acc = [] at_end = False while(not at_end): end = start+time_len_s if end >= len(sig.data): end = len(sig.data) at_end = True sliced = pcg_signal(sig.data[start:end],sig.fs) acc.append(sliced) start += step return acc
[docs] def emd_denoise_sth(sig: pcg_signal) -> pcg_signal: """EMD based denoising with soft threshold method. Based on: Boudraa, Abdel-O & Cexus, Jean-Christophe & Saidi, Zazia. (2005). EMD-Based Signal Noise Reduction. Signal Processing. 1. Note: Tau parameter calculation modified from the original Args: sig (pcg_signal): input signal Returns: pcg_signal: denoised signal """ imf = emd.sift.sift(sig.data) mad = np.median(np.abs(imf - np.median(imf,axis=0)),axis=0) #type: ignore sigma = mad/0.6745 # MODIFIED TAU tau = sigma*np.sqrt(2) th_imf = pywt.threshold(imf,tau) #type: ignore ret_sig = copy.deepcopy(sig) ret_sig.data = np.sum(th_imf,axis=1) ret_sig.processing_log.append("EMD denoising (soft th)") return ret_sig
[docs] def emd_denoise_savgol(sig: pcg_signal, window: int=10, poly: int=3) -> pcg_signal: """EMD based denoising method with Savoy-Golatzky filter method. Based on: Boudraa, Abdel-O & Cexus, Jean-Christophe & Saidi, Zazia. (2005). EMD-Based Signal Noise Reduction. Signal Processing. 1. Args: sig (pcg_signal): input signal window (int, optional): savgol filter window size [samples]. Defaults to 10. poly (int, optional): savgol polynomial degree to fit. Defaults to 3. Returns: pcg_signal: denoised signal """ imf = emd.sift.sift(sig.data) th_imf = signal.savgol_filter(imf,window,poly,mode="nearest") ret_sig = copy.deepcopy(sig) ret_sig.data = np.sum(th_imf,axis=1) ret_sig.processing_log.append("EMD denoising (savgol)") return ret_sig
[docs] def wt_denoise_sth(sig: pcg_signal, wt_family: str = "coif4", wt_level: int = 5) -> pcg_signal: """Denoise the signal with automatic wavelet thresholding method. Threshold is calculated automatically. Based on: D.L. Donoho, and I.M. Johnstone, Ideal spatial adaptation by wavelet shrinkage, Biometrika, vol. 81, no. 3, pp. 425-455, 1994 Args: sig (pcg_signal): input noisy signal wt_family (str, optional): wavelet family. Defaults to "coif4". wt_level (int, optional): wavelet decomposition level. Defaults to 5. Returns: pcg_signal: denoised signal """ ret_sig = copy.deepcopy(sig) th_coeffs = [] coeffs = pywt.wavedec(ret_sig.data,wt_family,level=wt_level) for coeff in coeffs: mad = np.median(np.abs(coeff - np.median(coeff))) #type: ignore sigma = mad/0.6745 # MODIFIED TAU tau = sigma*np.sqrt(2) th_coeffs.append(pywt.threshold(coeff,tau)) ret_sig.data = pywt.waverec(th_coeffs,wt_family) ret_sig.processing_log.append(f"Wavelet denoise (family-{wt_family}, level-{wt_level})") return ret_sig
[docs] def resample(sig: pcg_signal, target_fs:int) -> pcg_signal: """Resample signal to target samplerate Args: sig (pcg_signal): input signal target_fs (int): target samplerate Returns: pcg_signal: resampled signal """ ret_sig = copy.deepcopy(sig) ret_sig.data = signal.resample_poly(ret_sig.data,target_fs,ret_sig.fs) ret_sig.fs = target_fs ret_sig.processing_log.append(f"Resample to {target_fs} Hz") return ret_sig
[docs] class process_config(TypedDict): """Type to hold processing calculation configs""" step: Callable """function for the calculation""" params: dict[str,int|float|str] """parameters to pass to the function as keyword arguments"""
[docs] class process_pipeline: """Processing pipeline. One step's input is the previous step's output Attributes: steps (list[Callable | process_config]): List of steps as functions or function and parameters as keyword dictionary Example: Creating a simple pipeline >>> import pyPCG >>> my_pipeline = pyPCG.process_pipeline(pyPCG.zero_center, pyPCG.unit_scale) >>> print(my_pipeline) PCG processing pipeline [2 steps] Creating a pipeline with parameters: Option 1: Create a dictionary with the appropriate function with the parameters passed as keyword arguments For an easier experience, use the `process_config` type >>> import pyPCG >>> import pyPCG.preprocessing as preproc >>> step_1 = {"step":preproc.filter,"params":{"filt_ord":6,"filt_cutfreq":100,"filt_type":"LP"}} >>> step_2 = {"step":preproc.filter,"params":{"filt_ord":6,"filt_cutfreq":20,"filt_type":"HP"}} >>> my_pipeline = pyPCG.process_pipeline(step_1,step_2) Option 2: Using ``functools.partial`` >>> import pyPCG >>> import pyPCG.preprocessing as preproc >>> from functools import partial >>> step_1 = partial(preproc.filter, filt_ord=6, filt_cutfreq=100, filt_type="LP") >>> step_2 = partial(preproc.filter, filt_ord=6, filt_cutfreq=20, filt_type="HP") >>> my_pipeline = pyPCG.process_pipeline(step_1,step_2) Use the above pipeline: >>> import pyPCG.io as pcg_io >>> data, fs = pcg_io.read_signal_file("example.wav","wav") >>> example = pyPCG.pcg_signal(data,fs) >>> processed = my_pipeline.run(example) """ def __init__(self, *configs: Callable|process_config) -> None: """Create processing pipeline object""" self.steps = [] for k in configs: self.steps.append(k) def __repr__(self) -> str: return f"PCG processing pipeline [{len(self.steps)} steps]"
[docs] def run(self, input: pcg_signal) -> pcg_signal: """Run the processing pipeline Args: input (pcg_signal): input signal Returns: pcg_signal: processed signal """ out = input for step in self.steps: if type(step) is process_config or type(step) is dict: out = step["step"](out,**step["params"]) else: out = step(out) return out
if __name__ == '__main__': print("Preprocessing functions")