Processing pipeline tutorial

Import pyPCG modules

import pyPCG as pcg
import pyPCG.io as pcg_io
import pyPCG.preprocessing as preproc
import matplotlib.pyplot as plt
%matplotlib widget

Read in data and create signal object

from importlib.resources import files
data, fs = pcg_io.read_signal_file(str(files('pyPCG').joinpath("data").joinpath("example.wav")),"wav")
signal = pcg.pcg_signal(data,fs)

Process the signal

Let’s say, we want to process the signal and extract its envelope.

First, normalize the signal and lowpass filter the result (these parameters are just an example)

norm_signal = pcg.normalize(signal)
lp_signal = preproc.filter(norm_signal,filt_ord=6,filt_cutfreq=100,filt_type="LP")
print(lp_signal)
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'LP Filter (order-6, cut-100)']

Then apply a highpass filter, to achieve bandpass filtering. Then calculate the envelope

hp_signal = preproc.filter(lp_signal,filt_ord=6,filt_cutfreq=20,filt_type="HP")
env_signal = preproc.envelope(hp_signal)
print(env_signal)
plt.figure()
pcg.plot(norm_signal)
pcg.plot(env_signal)
plt.legend(["Normalized signal","Envelope"])
plt.xlim((0,2))
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'LP Filter (order-6, cut-100)', 'HP Filter (order-6, cut-20)', 'Envelope']
(0.0, 2.0)

Solving it this way creates a lot of local variables, and lots of lines of code. This can reduce the readability and reusability of our processing chain. This becomes more apparent when multiple files need to be processed.

Tihs is a common pattern: the current function’s input is just the previous one’s output.

pipeline

pipeline

To solve the above-mentioned problems, we can use a pipeline object.

Process pipeline object

Basic usage

As a small example, we can recreate normalize with combining zero_center and unit_scale

norm_pipeline = preproc.process_pipeline(pcg.zero_center,pcg.unit_scale)

To use the pipeline, call its run method on the input signal

n_pipelined_signal = norm_pipeline.run(signal)
print(n_pipelined_signal)

plt.figure()
pcg.plot(signal)
plt.xlim((0,2))

plt.figure()
pcg.plot(n_pipelined_signal,zeroline=True)
plt.xlim((0,2))
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale']
(0.0, 2.0)

An arbitrary amount of steps can be used in creating a pipeline

dummy_pipeline = preproc.process_pipeline(pcg.zero_center,pcg.unit_scale,preproc.envelope)
dummy_signal = dummy_pipeline.run(signal)
print(dummy_signal)
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'Envelope']

Passing parameters to pipeline steps

Pipeline steps must be functions which take only one argument, the signal, and produce only one output, the processed signal.

The signature should look something like: function(pyPCG.pcg_signal) -> pyPCG.pcg_signal

Other functions which produce a signal from an input signal while also taking some other parameters, like in filter, can also be used. But with a small modification.

Option 1: Using functools.partial

By creating partially applied versions of the original functions, we can create the required function signature.

from functools import partial
lp_filter = partial(preproc.filter, filt_ord=6, filt_cutfreq=100, filt_type="LP")

# lp_filter only takes one input
partial_signal = lp_filter(signal)
print(partial_signal)
PCG signal [60.0s 333Hz] ['File read in', 'LP Filter (order-6, cut-100)']
opt1_pipeline = preproc.process_pipeline(pcg.zero_center,pcg.unit_scale,lp_filter,preproc.envelope)
opt1_signal = opt1_pipeline.run(signal)
print(opt1_signal)
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'LP Filter (order-6, cut-100)', 'Envelope']

Option 2: Using a process_config

A pipeline object also can take a special dictionary of the function and the required parameters as a dictionary (key-value pairs).

opt2_pipeline = preproc.process_pipeline(pcg.zero_center,pcg.unit_scale,
                                         {"step":preproc.filter,"params":{"filt_ord":6,"filt_cutfreq":100,"filt_type":"LP"}},
                                         preproc.envelope)
opt2_signal = opt2_pipeline.run(signal)
print(opt2_signal)
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'LP Filter (order-6, cut-100)', 'Envelope']

Final pipeline

If we implement the example processing steps mentioned above as a pipeline, it would look like the following

my_pipeline = preproc.process_pipeline(pcg.normalize,
                                   {"step":preproc.filter,"params":{"filt_ord":6,"filt_cutfreq":100,"filt_type":"LP"}},
                                   {"step":preproc.filter,"params":{"filt_ord":6,"filt_cutfreq":20,"filt_type":"HP"}},
                                   preproc.envelope)

processed = my_pipeline.run(signal)
print(processed)

plt.figure()
pcg.plot(processed)
plt.xlim((0,2))
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'LP Filter (order-6, cut-100)', 'HP Filter (order-6, cut-20)', 'Envelope']
(0.0, 2.0)

Modifying a pipeline

You can modify the steps in a pipeline, by directly interfacing with its steps field. Although this is not recommended.

For example, let’s insert a denoising step before the envelope, and change the envelope to be the homomorphic envelope

my_pipeline.steps.insert(-1,preproc.wt_denoise_sth)

my_pipeline.steps[-1] = preproc.homomorphic

processed = my_pipeline.run(signal)
print(processed)

plt.figure()
pcg.plot(processed)
plt.xlim((0,2))
PCG signal [60.0s 333Hz] ['File read in', 'Zero center', 'Unit scale', 'LP Filter (order-6, cut-100)', 'HP Filter (order-6, cut-20)', 'Wavelet denoise (family-coif4, level-5)', 'Homomorphic envelope (order-6,cut-8)']
(0.0, 2.0)