## Preliminaries
Access the following file in google drive:
https://drive.google.com/drive/folders/1JelLO7mhzo-uttHvYrwLIxFmipceRrLP?usp=sharing

Right-click it then: select Organize, Add Shortcut.
Enable also high-RAM runtime via `Runtime > Change runtime` type in the menu. Then select `High-RAM` in the Runtime shape toggle button.

This will allow you to read the files using the provided code in this tutorial!



In [None]:
!pip install PyWavelets speechpy
import os
import librosa
import logging
import numpy as np
import pandas as pd
import pywt
import speechpy
import scipy.io as sio
import scipy.signal
import re

from scipy.io import wavfile
from tqdm import tqdm
import matplotlib.pyplot as plt

#Deep Learning Techniques for Digital Cardiac Auscultation Analysis

In this tutorial, we will explore the use of signal processing and deep learning techniques to analyze heart sound signal, i.e., phonocardiograms (PCGs), collected with a digital stethoscope.

Heart sounds, contain valuable information about the condition of the heart and are commonly used by clinicians to detect cardiac abnormalities. The advancement of deep learning has opened new possibilities for automating the analysis of these sounds, allowing for more precise and scalable screening and monitoring tools.

This tutorial is divided into two hands-on parts. In the first part, we will focus on the fundamentals of handling heart sound data, focusing on the processes of data loading, visualization, and preprocessing.

The second part of the tutorial is dedicated to applying deep learning models to heart sound analysis, with a focus on segmentation and classification. In this section, you will explore methods to segment heart sounds into their fundmental components (e.g., S1 sound, systole, S2 sound, and diastole) and classify them to identify the presence of murmurs.

In [None]:
from google.colab import drive
drive.mount('/content/drive')
os.chdir('drive/MyDrive/circor-dataset/')

# Part 1. Visualization

You have access to processed samples from the [2022 CirCor DigiScope dataset](https://physionet.org/content/circor-heart-sound/1.0.3/) already processed in the form of

$$dataset[i, t]= [\text{wavfile}_i, \mathbf{x}_i, \mathbf{y}_i, c_i], i=1, \ldots, N,$$
so that $\text{wavfile}_i \in \Sigma^*$ (i.e., a string with the .wavfile name) , $\mathbf{x}_i \in \mathbb{R}^{T_i}$, $\mathbf{y}_i \in \{1,2,3,4\}^{T_i}, c_i \in \{0, 1, 2\}$. The sampling rate is 4000Hz, meaning that from $t$ to $(t+1)$ $1/4000$ seconds have passed. Note that each sound has a duration $T_i$


In [None]:
dataset = np.load('tutorialdf1.npy', allow_pickle=True)

## Part 2. Inspect dataset
Inspect each column of the dataset

In [None]:
dataset_pandas = pd.DataFrame(dataset)
dataset_pandas.head()

Play close attention to column 0, i.e., the name of wavfile. It has the `patient identifier`, separated by `_`, and the auscultation location `PV, TV, AV, MV, Phc`. This will be important later!

## 1.1 Pick a random sound and use the below function to visualize the sound and its annotation

Remember that each heart state is codified with categorical labels. So
* 1: S1
* 2: Systole
* 3: S2
* 4: Diastole


In [None]:
import numpy as np
import matplotlib.pyplot as plt


def plot_sound_and_label(x, y, sampling_rate=4000):
    assert len(x) == len(y)
    number_of_samples = len(x)
    # Time (duration) = T_i / sample_rate
    time = np.arange(number_of_samples) / sampling_rate

    # Plotting x and y together
    fig, ax1 = plt.subplots(figsize=(12, 6))

    # Plot x on the primary y-axis
    ax1.plot(time, x, label='x (PCG)', color='b')
    ax1.set_xlabel('Time (s)')
    ax1.set_ylabel('Amplitude of o', color='b')
    ax1.tick_params(axis='y', labelcolor='b')

    # Create a secondary y-axis for y
    ax2 = ax1.twinx()
    ax2.step(time, y, label='y (Heart Sattes)', color='r', where='post', linewidth=2)
    ax2.set_ylabel('y (Labels)', color='r')
    ax2.set_yticks([1, 2, 3, 4])
    ax2.tick_params(axis='y', labelcolor='r')

    fig.suptitle('PCG Amplitude and Heart Sound Labels')
    ax1.legend(loc='upper left')
    ax2.legend(loc='upper right')
    plt.show()


In [None]:
sample_idx = 0  # chose an idx i and visualize it
sample = dataset[sample_idx]
plot_sound_and_label(x=sample[1], y=sample[2], sampling_rate=4000)

## 1.2 Mel-frequency anaylsis
Herein we will extract the Mel-frequency ceptrum and the MFCCs coefficients using `librosa`.

View the MFCC and Melspectogram using the provided functions.

In [None]:
def view_melspectogram(sound, sampling_rate=4000, n_mels=128):
  fmax = sampling_rate / 2
  mel_spectrogram = librosa.feature.melspectrogram(y=sound.astype(np.float32),
                                                   sr=sampling_rate,
                                                   n_mels=n_mels,
                                                   fmax=fmax)

  # Convert to decibel (dB) scale
  mel_spectrogram_db = librosa.power_to_db(mel_spectrogram, ref=np.max)

  # Plot the Mel spectrogram
  plt.figure(figsize=(10, 4))
  librosa.display.specshow(mel_spectrogram_db,
                           sr=sampling_rate,
                           x_axis='time',
                           y_axis='mel',
                           fmax=fmax)
  plt.colorbar(format='%+2.0f dB')
  plt.title('Mel Spectrogram')
  plt.tight_layout()
  plt.show()


In [None]:
view_melspectogram(sound=sample[1], sampling_rate=4000)

In [None]:
def get_mfcc(x,
             *,
              n_fft,
              window_length,
              hop_length,
              n_mfcc,
              fmin=25,
              fmax=400,
              sampling_rate=4000):
    S = librosa.feature.melspectrogram(y=x.astype(np.float32).squeeze(),
                                        n_fft=n_fft,
                                        sr=sampling_rate,
                                        win_length=window_length,
                                        hop_length=hop_length,  # samples
                                        fmin=fmin,
                                        fmax=fmax,
                                        window='hann')
    # Convert to log scale (dB).
    log_S = librosa.amplitude_to_db(S, ref=np.max)
    mfcc = librosa.feature.mfcc(S=log_S, n_mfcc=n_mfcc)
    return mfcc

In [None]:
def visualize_mfcc(x, hop_length, sampling_rate=4000):
    plt.figure(figsize=(10, 6))
    librosa.display.specshow(x,
                             x_axis='time', sr=sampling_rate, hop_length=hop_length)
    plt.colorbar(format='%+2.0f dB')
    plt.title('MFCC')
    plt.xlabel('Time (s)')
    plt.ylabel('MFCC Coefficients')
    plt.tight_layout()
    plt.show()

In [None]:
sampling_rate = 4000
x_mfcc = get_mfcc(sample[1],
                  n_mfcc=8,
                  n_fft=512,
                  window_length=int(0.0125 * sampling_rate),  # 80 ms
                  hop_length=int(0.025 * sampling_rate),  # frame-shift 20 ms => overlap 60 ms
                  fmin=30,
                  fmax=500)

visualize_mfcc(x_mfcc,
               hop_length=int(0.025 * sampling_rate),
               sampling_rate=sampling_rate)

# 2. Pre-processing
You will start to get your hands dirty here.
We need to develop a pipeline for our models.


# 2.1.1 Band-pass filtering
Select which low and high pass frenquecies we want to filter. We are specifically using [Butterworth filters](https://en.wikipedia.org/wiki/Butterworth_filter) for this purpose.

 Can you survey the literature for good options?

In [None]:
def plot_filter_responses(low_pass_fs, high_pass_fs, sampling_rate=4000, filter_order=2):
    """
    Plots the frequency responses of a highpass and a lowpass Butterworth filter.

    Parameters:
    - low_pass_fs: float, the cutoff frequency for the highpass filter (Hz)
    - high_pass_fs: float, the cutoff frequency for the lowpass filter (Hz)
    - sampling_rate: float, the sampling rate of the signals (Hz)
    - filter_order: int, the order of the Butterworth filter (default is 2)
    """
    import numpy as np
    import matplotlib.pyplot as plt
    from scipy.signal import butter, sosfreqz

    # Design the highpass filter
    sos_hp = butter(N=filter_order, Wn=low_pass_fs, btype='highpass', analog=False, fs=sampling_rate, output='sos')
    # Design the lowpass filter
    sos_lp = butter(N=filter_order, Wn=high_pass_fs, btype='lowpass', analog=False, fs=sampling_rate, output='sos')

    # Frequency response for the highpass filter
    w_hp, h_hp = sosfreqz(sos_hp, fs=sampling_rate)
    # Frequency response for the lowpass filter
    w_lp, h_lp = sosfreqz(sos_lp, fs=sampling_rate)

    # Plot the frequency response of both filters
    plt.figure(figsize=(12, 6))

    # Plot for the highpass filter
    plt.subplot(2, 1, 1)
    plt.plot(w_hp, 20 * np.log10(np.abs(h_hp)), label='Highpass Filter')
    plt.title('Highpass Filter Frequency Response')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Amplitude (dB)')
    plt.grid()
    plt.axvline(low_pass_fs, color='red', linestyle='--', label=f'Cutoff: {low_pass_fs} Hz')
    plt.legend()

    # Plot for the lowpass filter
    plt.subplot(2, 1, 2)
    plt.plot(w_lp, 20 * np.log10(np.abs(h_lp)), label='Lowpass Filter')
    plt.title('Lowpass Filter Frequency Response')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Amplitude (dB)')
    plt.grid()
    plt.axvline(high_pass_fs, color='red', linestyle='--', label=f'Cutoff: {high_pass_fs} Hz')
    plt.legend()

    plt.tight_layout()
    plt.show()

We suggest starting with a low-pass cutoff frequency of 100 Hz and high-pass cutoff frequency of 500 Hz.
Visualize the filter responses using `plot_filter_responses`.

In [None]:
plot_filter_responses(low_pass_fs=50, high_pass_fs=500)

## 2.1.1.Bonus - Can you search the literature for more adequate cutoff frequencies? Plot the results of your inquiries.

## 2.1.1.a. Filter the dataset
Iterate through all the samples in the dataset and apply the filters using the above-specified filters. Use the function `filter_sounds` with the desired low-pass and high-pass filter frequencies to do so.

In [None]:
def filter_sounds(data,
                  low_pass_fs,
                  high_pass_fs,
                  sampling_rate=4000):
    filtered_data = []
    for recording in tqdm(data, 'Filtering recordings', total=len(data), leave=True):
        sos_hp = scipy.signal.butter(N=2, Wn=low_pass_fs, btype='highpass', analog=False, fs=sampling_rate, output='sos')
        sos_lp = scipy.signal.butter(N=2, Wn=high_pass_fs, btype='lowpass', analog=False, fs=sampling_rate, output='sos')
        filtered = scipy.signal.sosfilt(sos_hp, recording)
        filtered = scipy.signal.sosfilt(sos_lp, filtered)
        filtered_data.append(filtered)
    return np.array(filtered_data, dtype=object)

In [None]:
sounds = dataset[:, 1]
filtered_sounds = filter_sounds(sounds, ...)

In [None]:
sample_filtered = dataset[sample_idx]
plot_sound_and_label(x=sample[1], y=sample[2])
plot_sound_and_label(x=sample_filtered[1], y=sample[2])

Let us assign the filtered sound to the respective dataset column.

**Only run this cell when you are certain of your filtering startegy. If you make a mistake you might have to restart the notebook and run every cell**

In [None]:
dataset[:, 1] = filtered_sounds

## 2.1.2 Denoising -  Averaging Theory
The section of denoising in this tutorial follows [Messer et al.](https://www.sciencedirect.com/science/article/pii/S0026269201000957).


Suppose a Source $\mathbf{S}$ is corrupted additively by i.i.d. Gaussian noise $\epsilon_i$. We observe $\mathbf{X}$ and not $\mathbf{S}$ in our measurements such that:

$$X_i = S + \epsilon_i, i=1,\ldots, N$$

If one computes its variance:

$$\text{Var}\left(\mathbf{S} + \frac{1}{N} \sum_{j=1}^{N}  \epsilon_j\right) = \text{Var}(\mathbf{S}) + \text{Var}\left(\frac{1}{N} \sum_{j=1}^{N}  \epsilon_j \right) = \text{Var}(\mathbf{S}) + \frac{T_i\sigma^2}{N^2}= \text{Var}(\mathbf{S}) + \frac{\sigma^2}{N}$$

one observes that the **standard deviation** of the random terms will shrink as $\frac{\sigma}{\sqrt(N)}$.

Due to the periodic nature of heart sounds and their stationarity (at least in a "short" period of time), we can think of $\mathbf{S}$ to be the expected waveform of the heart cycle in a recording, i.e. the *characteristic heart cycle* of a patient.

## 2.1.3 Wavevet Denoising

In practice, the analysis of a characteristic heartbeat for most downstream
applications is not very useful. Even under (quasi)-stationary assumptions, there may be other phenomena present in the signal such as murmurs that may occur in all states of the heart cycles and across several frequency bands. These phenomena may also be **transient** which immediately defeats the purpose of a characteristic heartbeat.



The Wavelet decomposition will allows us to filter the original signal in an adaptitive way. It allows one to make a trade-off between frequency and time resolution as a function of scale.

The Discrete Wavelet Transform is given by:
$$ \text{DWT}_x^{\psi}(m, n) = \sum_{t} x(t) \psi^*_{m, n}(t) = \sum_{t} x(t) \psi^*\left(\frac{t - n 2^m}{2^m}\right)$$ using a precision of $2^{-m}$, or $m$ bits. $\psi^*$ is the so-called *mother Wavelet function*.

The following code provides visualization of the Haar Wavelet decomposition using 5 levels, i.e. a floating point precision of 5. We will analyse an actual heart sound of our dataset.





In [None]:
idx = 1
max_duration = 1000
n_levels = 5
coeffs = pywt.wavedec(filtered_sounds[0][:max_duration], wavelet='haar', level=n_levels)
# Plot the original signal alongside the wavelet coefficients
plt.figure(figsize=(12, 12))

# Plot the original noisy signal
plt.subplot(n_levels + 2, 1, 1)
plt.plot(filtered_sounds[0][:max_duration], color='green')
plt.title('Original Noisy Signal')
plt.grid(True)

# Plot the approximation coefficients at the highest level
plt.subplot(n_levels + 2, 1, 2)
plt.plot(coeffs[0], color='blue')
plt.title(f'Haar Approximation Coefficients (Level {n_levels})')
plt.grid(True)

# Plot the detail coefficients for each level
for i in range(1, n_levels + 1):
    plt.subplot(n_levels + 2, 1, i + 2)
    plt.plot(coeffs[i], color='red')
    plt.title(f'Haar Detail Coefficients (Level {n_levels - i + 1})')
    plt.grid(True)

plt.tight_layout()
plt.show()


## 2.1.3.Bonus: Other wavelet mother wavelet functions
Can you experiment with other mother wavelet functions? Search the literature for particularly effective ones for PCG recordings.

## 2.1.3.a - Universal Thresholding
Since the DWT provides a multiresolution decomposition of the signal, we can use our **prior knowledge** that *most of the information is concentrated in the low frequencies* to mitigate noise in an adaptative fashion. Rembember, we may not want to get rid of all high frequency content necesseraly!



Suppose that the detail coefficients at the **finest scale** are distributed according to a standard Gaussian scaled by $\sigma$. Then, using [extreme value theory](https://nobel.web.unc.edu/wp-content/uploads/sites/13591/2019/11/Gaussian_Extremes-1.pdf), the largest detail coefficient is:

$$\max_{i=1, \ldots, n} |D_i| \approx O(\sigma\sqrt{2\log n})$$

The higher the frequency, the more sensitive to small perturbations our estimates will be, hence we are looking for a robust estimator of $\sigma$. Typically one uses the **median absolute deviation** estimate of the Gaussian:

$$\sigma \approx \frac{\text{median}(|D|)}{0.6745}$$



Now, we only need to define the thresholding function. We will implement the following soft-thresholding:

$$\hat{D}_j = \text{sign}(D_j) \cdot \max(|D_j| - \lambda, 0)$$

where $\lambda = \sigma\sqrt{2\log n}$. This effectively zeroes out coefficients smaller or equal to $\lambda$ and shifts the remaining $D_j$s towards 0 by $\lambda$.


Implement a function that receives the recordings and outputs the a filtered version of the signal. Use `PyWavelets` package to do so, using `pywt.wavedec`to decompose the signal and `pywt.waverec` to reconstruct the signal (after applying universal thresholding to the coefficients).

Run the cell below to see an example

In [None]:
def soft_threshold(coeff, threshold):
        return np.sign(coeff) * np.maximum(np.abs(coeff) - threshold, 0)
idx = 1
max_duration = 1000
n_levels = 5
coeffs = pywt.wavedec(filtered_sounds[0][:max_duration], wavelet='haar', level=n_levels)
# Plot the original signal alongside the wavelet coefficients
plt.figure(figsize=(12, 12))

# Define a threshold value (e.g., universal threshold)
sigma = np.median(np.abs(coeffs[-1])) / 0.6745  # Estimating noise level
threshold = sigma * np.sqrt(2 * np.log(len(filtered_sounds[0][:max_duration])))

# Apply soft thresholding to the detail coefficients
coeffs_thresholded = [coeffs[0]]  # Keep approximation coefficients unchanged
for coeff in coeffs[1:]:
    coeffs_thresholded.append(soft_threshold(coeff, threshold))


# Plot the original noisy signal
plt.subplot(n_levels + 2, 1, 1)
plt.plot(filtered_sounds[0][:max_duration], color='green')
plt.title('Original Noisy Signal')
plt.grid(True)

# Plot the approximation coefficients at the highest level
plt.subplot(n_levels + 2, 1, 2)
plt.plot(coeffs[0], color='blue')
plt.title(f'Haar Approximation Coefficients (Level {n_levels})')
plt.grid(True)

# Plot the detail coefficients for each level
for i in range(1, n_levels + 1):
    plt.subplot(n_levels + 2, 1, i + 2)
    plt.plot(coeffs[i], color='red', label='original')
    plt.plot(coeffs_thresholded[i], color='purple', label='after universal threshold')
    plt.title(f'Haar Detail Coefficients (Level {n_levels - i + 1})')
    plt.grid(True)
    plt.legend()

plt.tight_layout()
plt.show()


## 2.1.3.b. Bonus: design your own $\sigma$ threshold

We computed $\sigma$ as a function of the detail coefficients at the finer scale. This could be improved upon by computing $\sigma$ at each detail level. There are also more sophisticated thresholding strategies in the literature. Can you implement any of them?

## 2.1.3.c Denoise all recordings
Apply your wavelet denoising strategy for all recordings in the dataset.

NOTE: if you designed your own custom filtering strategy you have to adapt `wavelet_denoising` method to support it. We recommend following you follow a similar recipe to `soft_threshold`.


In [None]:
# TODO: dizer para incluirem o wavelet denoise que
def soft_threshold(coeff, threshold):
        return np.sign(coeff) * np.maximum(np.abs(coeff) - threshold, 0)

def wavelet_denoising(sounds, wavelet_family='haar', n_levels=5):
    # Perform the wavelet decomposition
    denoised_sounds = []
    for noisy_signal in tqdm(sounds, 'Wavelet Denoising', total=len(sounds), leave=True):
        coeffs = pywt.wavedec(noisy_signal, wavelet=wavelet_family, level=n_levels)

        # Define a threshold value (e.g., universal threshold)
        sigma = np.median(np.abs(coeffs[-1])) / 0.6745  # Estimating noise level
        threshold = sigma * np.sqrt(2 * np.log(len(noisy_signal)))

        # Apply soft thresholding to the detail coefficients
        coeffs_thresholded = [coeffs[0]]  # Keep approximation coefficients unchanged
        for coeff in coeffs[1:]:
            coeffs_thresholded.append(soft_threshold(coeff, threshold))
        denoised_sounds.append(pywt.waverec(coeffs_thresholded, wavelet_family))

    return np.array(denoised_sounds, dtype=object)

In [None]:
wavelet_family= '' # select a wavelet family
denoised_sounds = wavelet_denoising(dataset[:, 1], wavelet_family=wavelet_family)

Choose a sample and visualize the original sound and the sound after wavelet denoising.

In [None]:
def plot_denoised_signal(dataset, denoised_sounds, sample_idx):
    noisy_signal = dataset[sample_idx, 1]
    denoised_signal = denoised_sounds[sample_idx]
    plt.figure(figsize=(10, 6))
    plt.plot(noisy_signal, label='Noisy Signal')
    plt.plot(denoised_signal, label='Denoised Signal', linewidth=2)
    plt.legend()
    plt.title(f'Signal denoising using {wavelet_family} wavelet')
    plt.show()

# Inspect the results on a sample
sample_idx
plot_denoised_signal(dataset, denoised_sounds, sample_idx)

Asssign the denoised sounds to the appropriate column.


**Remember that this alters your dataset, so run it only when you are 100% once again!**

In [None]:
dataset[:, 1] = denoised_sounds # assign denoised sounds to the recordings in the dataset

# 2.2 Homomorphic Envelope

In this part you will be computing the Hilbert transform and the homomorphic envelope of the denoised heart sound signals. These representations allow separating the spectral components of a signal and thus can be particularly useful in discriminating between low-frequency events (such as the S1 and S2 sounds) from high-frequency noise or artifacts. By applying homomorphic filtering, the heart sound signal is decomposed into its envelope and excitation components.

## 2.2.1.a Hilbert Transform

The inner product between $\sin$ and $\cos$ over $[0, 2\pi]$ is:

$$\int_0^{2\pi} \sin(\omega t)\cos(\omega t) dt
=0$$

These functions are orthogonal, so they encode different information.

Note that $sin(\omega t) = \cos(\omega t + \frac{\pi}{2})$. So this phase shift operation permits one to inspect how the signal varies along a direction that is orthogonal to its own.

The Hilbert transform is given by:

$$x(t)^*= \frac{1}{\pi} \int x(\tau)\cdot \frac{1}{t-\tau}d\tau$$

which is equivalent to $x * h(t)$, where $h(t)=\frac{1}{\pi t}$ is the *Hilbert kernel*. The above function can be seen as an inner product in the frequency domain due to the Convolution theorem; the Fourier transform of $h(t)$ is $-j\, \text{sgn}\omega$, so this kernel effectively shifts the phase by $\pm$90ยบ of each frequency component of $x$.

The *analytic* signal is thus given by

$$ \hat{x}(t)=x(t) + jx^*(t)$$
where $x$ and $x^*$ can be seen as two basis functions, and $\hat{x}$ the linear combination $[1, j]^\top$ of them.

The Hilbert envelope is given by

$$H(t)= |\hat{x}(t)| := \sqrt{x(t)^2 + x^* (t)^2 }$$
i.e., the Euclidean norm in the Complex plane. This is also called the *magnitude* of the analytic signal.


Use `scipy.signal.hilbert` to compute the analytic signal.
Extract the Hilbert envelope.

In [None]:
sampling_frequency = 4000
# Input signal (replace with your actual signal)
sample_idx = 5
input_signal = np.array(dataset[sample_idx, 1])

analytic_signal = scipy.signal.hilbert(input_signal)  # Compute the analytic signal
hilbert_envelope = np.abs(analytic_signal)  # Get the envelope

## 2.2.1.b Visualize the Hilbert Transform

In [None]:
def view_hilbert_transform(input_signal, analytic_signal, amplitude_envelope, n_samples=500):
    # Plotting the original signal, the analytic signal, and the envelope
    plt.figure(figsize=(10, 6))

    # Plot the original signal
    plt.subplot(3, 1, 1)
    plt.plot(input_signal[:n_samples], label='Original Signal')
    plt.title('Original Signal')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')

    # Plot the real and imaginary parts of the analytic signal
    plt.subplot(3, 1, 2)
    plt.plot(analytic_signal.real[:n_samples], label='Real part of Analytic Signal')
    plt.plot(analytic_signal.imag[:n_samples], label='Imaginary part of Analytic Signal', linestyle='--')
    plt.title('Analytic Signal (Real and Imaginary Parts)')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.legend()

    # Plot the amplitude envelope
    plt.subplot(3, 1, 3)
    plt.plot(input_signal[:n_samples], label='Original Signal')
    plt.plot(hilbert_envelope[:n_samples], label='Hilbert Envelope', linewidth=2)
    plt.title('Hilbert Envelope')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
view_hilbert_transform(input_signal, analytic_signal, hilbert_envelope)

## 2.4.3 Homomorphic Filtering

The analytic signal can be described as

$$\hat{x}(t)=A(t)\cdot c(t),$$
i.e., a slowly variying amplitude function $A$ and a **carrier** high frequency signal $c(t)$.

The $\log$-transform makes this relationship additive, so one can effectively use low-pass filters (LPFs) to remove or mitigate high-frequency noise in the carriers.

$$  A_{\text{homomorphic}}(t) = \exp\left(\text{LPF}\{\log H(t)\}\right) $$

Use an order 2 low-pass Butterworth filter at 640 Hz to compute the Homomorphic envelop. Use `scipy.signal.butter` for this effect. This is parametrization follows from the work of [Springer et al.](https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=7234876)

Remember that the sampling rate is 4000Hz. Visaulize the envelope using `view_envelograms`

In [None]:
sampling_frequency = 4000
# Define filter parameters
lpf_frequency = 640   # Since we are using 4KHz (8 * (4000/50))


B_low, A_low = scipy.signal.butter(N=1, Wn=2 * lpf_frequency / sampling_frequency, btype='low')
# Apply the low-pass filter on the log of the hilbert envelope
homomorphic_envelope = np.exp(scipy.signal.filtfilt(B_low,
                                                    A_low,
                                                    np.log(hilbert_envelope)))


In [None]:
def view_envelograms(input_signal,
                           analytic_signal,
                           hilbert_envelope,
                           homomorphic_envelope,
                           n_samples=500):
    # Plotting the original signal, the analytic signal, and the envelope
    plt.figure(figsize=(10, 6))

    # Plot the original signal
    plt.subplot(4, 1, 1)
    plt.plot(input_signal[:n_samples], label='Original Signal')
    plt.title('Original Signal')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')

    # Plot the real and imaginary parts of the analytic signal
    plt.subplot(4, 1, 2)
    plt.plot(analytic_signal.real[:n_samples], label='Real part of Analytic Signal')
    plt.plot(analytic_signal.imag[:n_samples], label='Imaginary part of Analytic Signal', linestyle='--')
    plt.title('Analytic Signal (Real and Imaginary Parts)')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.legend()

    # Plot the amplitude envelope
    plt.subplot(4, 1, 3)
    plt.plot(input_signal[:n_samples], label='Original Signal')
    plt.plot(hilbert_envelope[:n_samples], label='Hilbert Envelope', linewidth=2)
    plt.title('Hilbert Envelope')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.legend()

    plt.subplot(4, 1, 4)
    plt.plot(input_signal[:n_samples], label='Original Signal')
    plt.plot(homomorphic_envelope[:n_samples], label='Amplitude Envelope', linewidth=2)
    plt.title('Homomorphic Envelope (Springer et al.)')
    plt.xlabel('Time [s]')
    plt.ylabel('Amplitude')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
view_envelograms(input_signal, analytic_signal, hilbert_envelope, homomorphic_envelope)

## 2.4.3.Bonus
Search the literature for other envelograms used for PCG segmentation. Can you implement them? What different features do they capture?

## 2.4.3 Compute the Normalized envelograms
Normalize each envelope for each recording so it has mean 0 and unit variance.

Note that you need to adapt the code hereinafter should you wish to include your custom envelograms.

In [None]:
normalize_signal = lambda x: (x - np.mean(x)) / np.std(x)
normalized_hilbert = normalize_signal(hilbert_envelope)
normalized_homomorphic = normalize_signal(homomorphic_envelope)

plt.plot(normalized_hilbert)
plt.plot(normalized_homomorphic)

# 2.4.3 Implement the *normalized* envelops for all dataset.
The new dataset should have the following columns in order:

*   recording name
*   filtered PCG
*   normalized Hilbert envelogram
*   normalized Homomorphic envelogram
*   any additional bonus normalized envelograms
*   heart sound sequential labels
*   murmur categorical labels

In this order.

In [None]:
normalize_signal = lambda x: (x - np.mean(x)) / np.std(x)
def extract_evelograms(dataset, sampling_rate=4000, low_pass_fs=640):
    dataset_out = np.zeros([dataset.shape[0], 6], dtype=object)
    dataset_out[:, :4] = dataset

    B_low, A_low = scipy.signal.butter(N=1, Wn=2 * low_pass_fs / sampling_rate, btype='low')
    for i, recording in tqdm(enumerate(dataset[:, 1]), 'Extracting Envelograms', total=len(dataset), leave=True):
        analytic_signal = scipy.signal.hilbert(recording)  # Compute the analytic signal
        amplitude_envelope = np.abs(analytic_signal)
        homomorphic_envelope = np.exp(scipy.signal.filtfilt(B_low, A_low, np.log(amplitude_envelope)))
        # any custom envelograms here

        # last two columns for labels!
        dataset_out[i, -2] = normalize_signal(amplitude_envelope)
        dataset_out[i, -1] = normalize_signal(homomorphic_envelope)
    return dataset_out

In [None]:
# This code has to be adapted if you have more than 2 envelograms!!!
dataset_out = extract_evelograms(dataset)
new_order = dataset_out[:, [1, 4, 5, 2, 3]]


dataset_out[:, 1:] = new_order

## 2.5. Downsample
Resample the all signals from 4000 Hz to 50 Hz, i.e.,  the envelograms and heart sound sequential labels of the dataset.

Downsampling to 50 Hz is of  fundamental importance for the following segmentation steps (to be implemented in the second part of the tutorial), in order to reduce the computational complexity of the segmentator.

Note that the labels are discrete.

If you have any additional envelograms, do not forget to adapt this function.

In [None]:
# TODO: ja dado
def resample_signal(dataset,
                    original_rate=4000,
                    new_rate=50):
    for i in tqdm(range(len(dataset)), # columns with features
                                          'Resampling recordings', total=len(dataset), leave=True):
        sound = dataset[i, 1]
        hilb = dataset[i, 2]
        homo = dataset[i, 3]
        label = dataset[i, 4]
        time_secs = len(sound) / original_rate
        number_of_samples = int(time_secs * new_rate)

        # Resample sound (continuous)
        dataset[i, 1] = scipy.signal.resample(sound, number_of_samples).squeeze()
        dataset[i, 2] = scipy.signal.resample(hilb, number_of_samples).squeeze()
        dataset[i, 3] = scipy.signal.resample(homo, number_of_samples).squeeze()

        # Ensure labels are in a numeric format (int or float)
        label = np.array(label, dtype=np.int32)  # Convert labels to integers
        # Create new time indices for the resampled labels
        original_indices = np.arange(len(label))
        new_indices = np.linspace(0, len(label) - 1, number_of_samples)

        # Resample labels by nearest-neighbor interpolation
        resampled_label = np.round(np.interp(new_indices, original_indices, label)).astype(int)
        resampled_label = np.clip(resampled_label, 1, 4)  # Ensure values are within [1, 2, 3, 4]
        dataset[i, 4] = resampled_label.squeeze()

    return dataset

In [None]:
dataset_out_resampled = resample_signal(dataset_out.copy())

In [None]:
sample_idx = 0
sample = dataset_out_resampled[sample_idx]
plot_sound_and_label(x=sample[1], y=sample[4])

#  2.6. Splits per patient
We will now generate the train, validation, and test splits for the second part of the tutorial. If you have custom features that you wish to include pay extra attention and adapt the code when needed.

We will fetch patient recordings by parsing the wavfile name. We will produce three sets `train`, `val`, and `test` so that they are mutually exclusive in terms of patient ids.

Remember from 2. that each wavfile has the patient_id and ausculation location divided seperated by characterd `_`. We will use this to fetch all unique patient identifiers.

In [None]:
def patient_ids_only(patients_col):
        def pattern_match(pattern, string):
            start, end = re.search(pattern, string).span()
            return string[start:end]

        return np.array([int(pattern_match('\d*', patient)) for patient in patients_col])

patient_ids = patient_ids_only(dataset[:, 0])

We will add the patient_ids as a column to the dataset.

In [None]:
dataset_out_final = np.zeros([dataset_out_resampled.shape[0], 7], dtype=object)
dataset_out_final[:, 1:] = dataset_out_resampled
dataset_out_final[:, 0] = patient_ids

# Use the below function to create train val and test splits
Use a test_split size of 20%

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split


def extract_and_save_splits(data, split_size=.2):
    # Extract patient IDs (assuming they are in row 0)
    patient_ids = data[:, 0].astype(np.int32)

    # Get the unique patient IDs
    unique_patient_ids = np.unique(patient_ids)
    # Step 1: Split unique patient IDs into train and remaining (val + test)
    train_ids, val_test_ids = train_test_split(unique_patient_ids, test_size=split_size, random_state=42)

    # Step 2: Split val_test_ids into validation and test sets
    val_ids, test_ids = train_test_split(val_test_ids, test_size=0.5, random_state=42)

    # Step 3: Create masks for train, validation, and test based on patient IDs
    train_mask = np.isin(patient_ids, train_ids)
    val_mask = np.isin(patient_ids, val_ids)
    test_mask = np.isin(patient_ids, test_ids)

    # Step 4: Apply the masks to create train, val, and test sets
    train_data = data[train_mask]
    val_data = data[val_mask]
    test_data = data[test_mask]
    print(f'Train size: {len(train_data)}; Val size: {len(val_data)}; Test size: {len(test_data)}')
    np.save('my_train_data.npy', train_data)
    np.save('my_val_data.npy', val_data)
    np.save('my_test_data.npy', test_data)


In [None]:
extract_and_save_splits(dataset_out_final)

# Final submission
Send your .ipynb files to francesco.renna@fc.up.pt as well as 3 slides summarizing your approaches for both tutorials 1 and 2.