Skip to content
Commits on Source (46)
HYDRA_FULL_ERROR=1
OC_CAUSE=1
MPLBACKEND=wxAgg
\ No newline at end of file
MPLCURSORS='{"hover": 1}'
\ No newline at end of file
......@@ -148,7 +148,7 @@ cython_debug/
tests/testfiles
tests/output
/configs/local/custom.yaml
/configs/local/*.yaml
.vscode/tasks.json
.vscode/settings.json
requirements.txt
......
# Nonlinearity Measure - Reference implementation of TS 104063
# Nonlinearity Measure - Reference implementation of ETSI TS 104 063
Code for non-linearity measure for upcoming ETSI TS 104 063. Includes also verification and testing with simple non-linear models.
......@@ -41,6 +41,14 @@ Similar to first example, but NLM is calculated...
- `file_reference` is analyzed from start time 3.0 seconds until end
- `file_measured` is analyzed from start time 0.0 (default) until a duration of 17.0 seconds
Other example:
~~~terminal
> python -m nonlinearity file_measured=measured.wav file_ref=ref.wav fmin=100 fmax=7000 use_activity_threshold=true ir.nperseg=16384 spectrum.nperseg=16384 spectrum.fraction=3
nlm= ...
~~~
See configs/config.yaml for more parameters.
Configuration of the whole project is based on `hydra-core` package, see https://hydra.cc/.
......@@ -48,12 +56,53 @@ Configuration of the whole project is based on `hydra-core` package, see https:/
# API access
See `calculate_nlm_file(..)` for file-based calculations or `calculate_nlm(..)` for numpy/array-based calculations in `nonlinearity.calculate.py`.
# Debug data
Dependencies for showing and saving extended result data are installed as follows:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --extra debug
~~~
Or install all extra dependencies:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --all-extras
~~~
# Advanced
The ITU-T P.56 active speech level implementation in Python is rather slow. You may compile it for better performance.
Ensure that all development dependencies are installed:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --dev
~~~
Compile with e.g., nuitka:
~~~terminal
../ts104063-nonlinearity-measure> uv run nuitka --module nonlinearity/utils/p56/asl.py --include-module=numpy --output-dir=nonlinearity/utils/p56/build
[ ... ]
../ts104063-nonlinearity-measure> uv run nuitka --module nonlinearity/utils/p56/prefilter.py --include-module=numpy --include-module=scipy.signal --output-dir=nonlinearity/utils/p56/build --remove-output
~~~
# References
(list is for sure incomplete; further input is appreciated!)
## Sweep-based system identification
https://ant-novak.com/posts/research/2015-10-30_JAES_Swept/
https://hal.science/hal-02504303/document
https://www.melaudia.net/zdoc/sweepSine.PDF
## Non-linearities & Modelling
https://www.researchgate.net/publication/286670404_Characterisation_and_modelling_of_non-linear_loudspeakers
https://www.hsu-hh.de/ant/wp-content/uploads/sites/699/2018/04/Eichas_VA_Modeling_of_guitar_amps_with_WH_models.pdf
https://openhsu.ub.hsu-hh.de/server/api/core/bitstreams/0b6f9c6f-c444-45f7-b392-e2ff4188d25b/content
https://hal.science/hal-02504303/document
## Related Distortion Measures
https://www.listeninc.com/wp/media/2023/06/paper_121AES_non_coherent_distortion_Listen_Inc.pdf
https://www.klippel.de/fileadmin/klippel/Files/Know_How/Application_Notes/AN_07_Weighted_Harmonic_Distortion_%28HI-2%29.pdf
\ No newline at end of file
......@@ -40,4 +40,4 @@ fmax: 12000
fs: 48000
# use ASL from P.56 and level-vs-time during impulse response estimate
use_activity_threshold: False
\ No newline at end of file
use_activity_threshold: True
\ No newline at end of file
# spectrum parameters to estimate impulse response
nperseg: 8192
nperseg: 16384
overlap: 0.90
#noverlap:
window: flattop
......
# spectrum parameters to estimate linear-/non-linear components
nperseg: 8192
nperseg: 16384
overlap: 0.75
#noverlap: 6144
window: 'hann'
......
......@@ -5,20 +5,30 @@ Created on May 29 2024 15:58
@author: Jan.Reimes
"""
import pandas
import numpy as np
from numpy.typing import NDArray
from typing import Dict, Any
from functools import partial
from typing import Dict, Any, Optional, Self
from dataclasses import dataclass, asdict
from .spectrum import to_db as _to_db
# constants
MIN_SAMPLERATE = 16000 # 32 kHz? 48 kHz?
MIN_LEVEL_DB: float = -180.0
MIN_POWER_LIN = np.power(10, MIN_LEVEL_DB/10)
MIN_LEVEL_LIN = np.power(10, MIN_LEVEL_DB/20)
FS_GLOBAL = 48000 # Constant sampling rate - see clause 5 in TS 104 063!
MIN_LEVEL_DB: float = -100.0
MIN_LEVEL_RMS = np.power(10, MIN_LEVEL_DB/20)
MIN_LEVEL_POW = np.power(10, MIN_LEVEL_DB/10)
MIN_SPEECH_ACTIVITY = 0.60
DEFAULT_REF_LEVEL_DB = -26.0
MAX_DIFF_SIGNAL_DURATION = 3.0
# helper functions
to_db = partial(_to_db, abs_min_value_db=MIN_LEVEL_DB)
# type definitions
@dataclass
class FreqRespResult:
......@@ -34,7 +44,7 @@ class FreqRespResult:
return asdict(self)
@classmethod
def merge_channels(cls, *args) -> 'FreqRespResult':
def merge_channels(cls, *args) -> Self:
imp_resp = np.array([arg.imp_resp for arg in args]).T
freq = args[0].freq
freq_resp = np.array([arg.freq_resp for arg in args]).T
......@@ -45,18 +55,30 @@ class FreqRespResult:
@dataclass
class NLMDict:
measurement: NDArray
linear: NDArray
nonlinear: NDArray
noise: NDArray
measurement: NDArray|Any
linear: Optional[NDArray|Any] = None
nonlinear: Optional[NDArray|Any] = None
noise: Optional[NDArray|Any] = None
def get_value(self, key: str) -> NDArray|Any:
if hasattr(self, key):
return getattr(self, key)
else:
raise KeyError(f"Key '{key}' not found in NLMDict.")
def set_value(self, key: str, value: NDArray|Any) -> None:
if hasattr(self, key):
object.__setattr__(self, key, value)
else:
raise KeyError(f"Key '{key}' not found in NLMDict.")
def asdict(self) -> Dict[str, NDArray]:
return asdict(self)
@dataclass
class P56Dict:
asl: float
act: float
asl: float = -100.0
act: float = 0.0
def asdict(self) -> Dict[str, float]:
return asdict(self)
......@@ -70,7 +92,6 @@ class NLMResult:
fmax: float
delay: int
max_xcorr: float
wcf: float
p56: P56Dict
fs: int
nbr_channels: int
......@@ -79,5 +100,8 @@ class NLMResult:
def asdict(self) -> Dict[str, Any]:
return asdict(self)
pandas.options.io.excel.xlsx.reader = 'calamine'
pandas.options.io.excel.xlsx.writer = 'xlsxwriter'
if __name__ == "__main__":
pass
......@@ -4,39 +4,76 @@ Created on Sept 03 2024 10:26
@author: Jan.Reimes
"""
import logging
import hydra
from omegaconf import DictConfig
from pathlib import Path
from typing import Any, Dict, List
import hydra
import psutil
from omegaconf import DictConfig, OmegaConf
from rootutils import setup_root
from .calculate import calculate_nlm_file, NLMResult
from .calculate import NLMResult, calculate_nlm_file
from .debug.single_values import save_nlm_results
from .debug.spectral_data import save_spectral_data
from .utils import get_file_pairs
from .utils.resolver import register_all_resolvers
register_all_resolvers() # needs to be called before using OmegaConf
@hydra.main(version_base="1.3", config_path="../configs", config_name="config")
def main(cfg: DictConfig):
logger = logging.getLogger("nonlinearity")
try:
# check/resolve input files
cfg['file_measured'] = Path(cfg.file_measured).absolute()
cfg['file_ref'] = Path(cfg.file_ref).absolute()
def nlm_calc(cfg: Dict[str, Any] | DictConfig, measured: str, ref: str) -> NLMResult:
"""Internal function to calculate NLM for a single file pair."""
cfg["file_measured"] = Path(measured).absolute()
cfg["file_ref"] = Path(ref).absolute()
# run calculation
res: NLMResult = calculate_nlm_file(**cfg)
# save spectral data to output?
save_spectral_data(res, cfg)
# save single values to output?
save_nlm_results(res, cfg)
return res
@hydra.main(version_base="1.3", config_path="../configs", config_name="config")
def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
try:
# check/resolve input files
file_pairs = get_file_pairs(cfg.file_measured, cfg.file_ref)
if (nbr_fp := len(file_pairs)) == 1:
# single file pair, no parallelization needed:
res = nlm_calc(cfg, *file_pairs[0])
logger.info(f"{Path(file_pairs[i][0]).name}\tNLM = {res.nlm.round(1)} dB")
return res
else:
# run in parallel for multiple file pairs
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib
n_workers = max(1, min(nbr_fp, cfg.get("n_jobs", psutil.cpu_count() // 2)))
results: List[NLMResult] = []
with tqdm_joblib(desc="NLM calculation", total=len(file_pairs)) as pb:
jobs = (delayed(nlm_calc)(OmegaConf.to_object(cfg), measured, ref) for measured, ref in file_pairs) # add copy of config to each pair
for i, res in enumerate(Parallel(n_jobs=n_workers, return_as="generator")(jobs)):
pb.set_description_str(f"{Path(file_pairs[i][0]).name}\tNLM = {res.nlm.round(1)} dB")
results.append(res)
return results
except Exception as e:
logger.exception(f"Error during calculation: {e}")
raise e
if __name__ == "__main__":
setup_root(__file__, pythonpath=True)
main()
......@@ -5,40 +5,48 @@ Created on Sept 05 2024 17:53
@author: Jan.Reimes
"""
import logging
from pathlib import Path
from scipy.signal import freqz
from typing import Any, Dict, Iterable, Optional
import numpy as np
from numpy.typing import NDArray
from typing import Optional, Union, Iterable, Dict, Any
from ensure import Check
import soundfile as sf
import soxr
import logging
from ensure import Check
from numpy.typing import NDArray
from . import NLMResult, NLMDict, P56Dict, MIN_SAMPLERATE, MIN_POWER_LIN, MAX_DIFF_SIGNAL_DURATION, FreqRespResult
from . import DEFAULT_REF_LEVEL_DB, FS_GLOBAL, MAX_DIFF_SIGNAL_DURATION, MIN_LEVEL_RMS, FreqRespResult, NLMDict, NLMResult, P56Dict
from .estimate_ir import estimate_ir_csd
from .spectrum import FftSpectrumVsTime
from .utils.delay import calc_delay, compensate_delays
from .utils.filter import lfilter_block
from .utils.p56.asl import calculate_p56_asl
from .estimate_ir import estimate_ir_csd
from .spectrum import calculate_spectrum
from .utils.misc import interpolate_spectrum
log = logging.getLogger(__name__)
try:
from .utils.p56.build.asl import calculate_p56_asl
except ImportError:
log.info("Using native P.56 ASL calculation. Consider compiling it for better performance.")
from .utils.p56.asl import calculate_p56_asl
def calculate_nlm_file(file_measured: Path, file_ref: Path, fs: Optional[int] = None,
ch_measured: Optional[Union[int, Iterable[int]]] = None,
def calculate_nlm_file(
file_measured: Path,
file_ref: Path,
fs: Optional[int] = FS_GLOBAL,
ch_measured: Optional[int | Iterable[int]] = None,
ch_ref: Optional[int] = 1,
time_start_measured: float = 0.0, time_duration_measured: Optional[float] = None,
time_start_ref: float = 0.0, time_duration_ref: Optional[float] = None,
**kwargs) -> NLMResult:
time_start_measured: float = 0.0,
time_duration_measured: Optional[float] = None,
time_start_ref: float = 0.0,
time_duration_ref: Optional[float] = None,
**kwargs,
) -> NLMResult:
# check wave files
file_measured = Path(file_measured).absolute()
file_ref = Path(file_ref).absolute()
Check(file_measured.is_file()).is_true().or_raise(FileNotFoundError, str(file_measured))
Check(file_ref.is_file()).is_true().or_raise(FileNotFoundError, str(file_ref))
dtype = kwargs.get('dtype', 'float32')
dtype = kwargs.get("dtype", "float32")
# use file info for several checks
info_m = sf.info(file_measured)
......@@ -46,73 +54,88 @@ def calculate_nlm_file(file_measured: Path, file_ref: Path, fs: Optional[int] =
# check channel numbers
ch_ref: int = 1 if ch_ref is None else int(ch_ref)
(Check(ch_ref).is_less_than_or_equal_to(info_r.channels).also.is_greater_than_or_equal_to(1).
or_raise(ValueError, f"Reference wave file has {info_r.channels} channel(s), but ch_ref={ch_ref} was specified."))
(
Check(ch_ref)
.is_less_than_or_equal_to(info_r.channels)
.also.is_greater_than_or_equal_to(1)
.or_raise(
ValueError,
f"Reference wave file has {info_r.channels} channel(s), but ch_ref={ch_ref} was specified.",
)
)
ch_measured: NDArray = 1 + np.arange(info_m.channels) if ch_measured is None else np.atleast_1d(ch_measured).astype(int) # default: all channels
(Check(ch_measured.min()).is_greater_than_or_equal_to(1).
or_raise(ValueError,
f"Measured wave file has {info_m.channels} channels, "
f"but max(ch_measured)={ch_measured.max()} was specified."))
Check(ch_measured.max()).is_less_than_or_equal_to(
info_m.channels).or_raise(ValueError,
f"Measured wave file has {info_m.channels} channels, but max(ch_measured)={ch_measured.max()} was specified."
(
Check(ch_measured.min())
.is_greater_than_or_equal_to(1)
.or_raise(
ValueError,
f"Measured wave file has {info_m.channels} channels, but max(ch_measured)={ch_measured.max()} was specified.",
)
)
# determine common sampling rate
fs_m = info_m.samplerate
fs_r = info_r.samplerate
if fs is None:
fs = min(fs_m, fs_r)
else:
fs = int(fs)
Check(fs).is_greater_than_or_equal_to(MIN_SAMPLERATE).or_raise(
ValueError, f'Samplerate must be at least {MIN_SAMPLERATE}, but is {fs}')
Check(ch_measured.max()).is_less_than_or_equal_to(info_m.channels).or_raise(
ValueError,
f"Measured wave file has {info_m.channels} channels, but max(ch_measured)={ch_measured.max()} was specified.",
)
# load wave files
measured, fs_m = sf.read(file_measured, start=round(fs_m * time_start_measured), always_2d=True,
stop=round(time_duration_measured * fs_m) if time_duration_measured else None, dtype=dtype)
reference, fs_r = sf.read(file_ref, start=round(fs_r * time_start_ref), always_2d=True,
stop=round(time_duration_ref * fs_r) if time_duration_ref else None, dtype=dtype)
fs_m = info_m.samplerate
fs_r = info_r.samplerate
measured, fs_m = sf.read(
file_measured,
start=round(fs_m * time_start_measured),
always_2d=True,
stop=round(time_duration_measured * fs_m) if time_duration_measured else None,
dtype=dtype,
)
reference, fs_r = sf.read(
file_ref,
start=round(fs_r * time_start_ref),
always_2d=True,
stop=round(time_duration_ref * fs_r) if time_duration_ref else None,
dtype=dtype,
)
# limit channels
measured = measured[:, ch_measured - 1]
reference = reference[:, ch_ref - 1]
# check resampling
resample_quality = kwargs.get('resample_quality', 'VHQ')
if fs != fs_m:
fs = fs or FS_GLOBAL
resample_quality = kwargs.get("resample_quality", "VHQ")
if fs_m != FS_GLOBAL:
measured = soxr.resample(measured, fs_m, fs, quality=resample_quality)
if fs != fs_r:
if fs_r != FS_GLOBAL:
reference = soxr.resample(reference, fs_r, fs, quality=resample_quality)
# check duration
max_len = max(measured.shape[0], reference.shape[0])
min_len = min(measured.shape[0], reference.shape[0])
max_diff_samples = round(MAX_DIFF_SIGNAL_DURATION * fs)
if np.abs(max_len-min_len) > max_diff_samples:
log.warning(f"Duration difference between measured and reference signal "
f"is larger than {MAX_DIFF_SIGNAL_DURATION} s. Consider adjusting time_start and time_duration of "
f"measured and/or reference signal.")
return calculate_nlm(measured, reference, fs, **kwargs)
# sf.write(r"d:\SVN\Projects\ts104063-nonlinearity-measure\data\TR-41\tmp.wav", np.stack((measured.squeeze(), reference), axis=1), FS_GLOBAL)
return calculate_nlm(measured, reference, fs=fs, **kwargs)
def calculate_nlm(measurement: NDArray, reference: NDArray, fs: int, fmin: float = 50.0, fmax: float = 16_000.0,
compensate_delay: bool = True, interpolate_zeros: bool = False,
def calculate_nlm(
measurement: NDArray,
reference: NDArray,
fmin: float = 50.0,
fmax: float = 16_000.0,
fs: Optional[int] = FS_GLOBAL,
compensate_delay: bool = True,
interpolate_zeros: bool = False,
delay: Optional[Dict[str, Any]] = None, # options for delay compensation
use_activity_threshold: bool = True, # use activity threshold from P.56
ir: Optional[Dict[str, Any]] = None, # options for impulse response estimation
filter: Optional[Dict[str, Any]] = None, # options for filtering reference with ir
spectrum: Optional[Dict[str, Any]] = None, # options for spectral non-linearity estimate
**kwargs) -> NLMResult:
**kwargs,
) -> NLMResult:
### MAIN ALGORITHM ###
# Preparations/check input arguments
ref_level_db = kwargs.get("ref_level_db", DEFAULT_REF_LEVEL_DB)
fs = round(fs or FS_GLOBAL)
Check(fs).equals(FS_GLOBAL).or_raise(ValueError, f"Invalid sampling frequency fs={fs} Hz - must be {FS_GLOBAL} Hz.")
delay = dict(**(delay or {}))
ir = dict(**(ir or {}))
filter = dict(**(filter or {}))
......@@ -131,7 +154,7 @@ def calculate_nlm(measurement: NDArray, reference: NDArray, fs: int, fmin: float
Check(len(reference.shape)).equals(1).or_raise(ValueError, f"Invalid shape: {reference.shape}")
# c) check fs vs fmax
Check(fmax).is_less_than_or_equal_to(fs/2).or_raise(ValueError, f"fmax={fmax:.1f} Hz is larger than fs/2 (fs={int(fs)} Hz)")
Check(fmax).is_less_than_or_equal_to(fs / 2).or_raise(ValueError, f"fmax={fmax:.1f} Hz is larger than fs/2={int(fs / 2)} Hz)")
# Clause 5.2: compensate delay
delays, max_xcorr = np.zeros(measurement.shape[1]), np.zeros(measurement.shape[1])
......@@ -140,13 +163,24 @@ def calculate_nlm(measurement: NDArray, reference: NDArray, fs: int, fmin: float
measurement = compensate_delays(measurement, delays, fs, reference.shape[0])
# Clause 5.2: ensure same length
min_len = min(measurement.shape[0], reference.shape[0])
min_len, max_len = (
max(measurement.shape[0], reference.shape[0]),
min(measurement.shape[0], reference.shape[0]),
)
max_diff_samples = round(MAX_DIFF_SIGNAL_DURATION * fs)
if np.abs(max_len - min_len) > max_diff_samples:
log.warning(
f"Duration difference between measured and reference signal "
f"is larger than {MAX_DIFF_SIGNAL_DURATION} s. Consider adjusting time_start and time_duration of "
f"measured and/or reference signal."
)
measurement = measurement[:min_len]
reference = reference[:min_len]
# Clause 5.2: check speech activity
asl, act = calculate_p56_asl(reference, fs)
reference *= np.power(10, (-26.0 - asl)/20)
reference *= np.power(10, (ref_level_db - asl) / 20)
p56_result = P56Dict(asl, act)
# use (in)activity ratio and reference signal for simple activity detection
......@@ -154,10 +188,16 @@ def calculate_nlm(measurement: NDArray, reference: NDArray, fs: int, fmin: float
# Clause 5.3: estimate impulse response between degraded and reference
fr_results = []
min_phase = ir.pop('min_phase', False)
min_phase = ir.pop("min_phase", False)
for i in range(nbr_channels):
fr_result = estimate_ir_csd(measurement[:, i], reference, fs, inactivity_fraction=inactivity_fraction,
min_phase=min_phase, **ir)
fr_result = estimate_ir_csd(
measurement[:, i],
reference,
fs,
inactivity_fraction=inactivity_fraction,
min_phase=min_phase,
**ir,
)
fr_results.append(fr_result)
# merge frequency response results across channels
......@@ -166,62 +206,70 @@ def calculate_nlm(measurement: NDArray, reference: NDArray, fs: int, fmin: float
# Clause 5.4: filter reference signal with estimated IR
reference_flt = np.zeros_like(measurement)
for i in range(nbr_channels):
reference_flt[:, i] = lfilter_block(freq_resp_result.imp_resp[:, i], None, reference, compensate_fir_delay=not min_phase,
**filter)
reference_flt[:, i] = lfilter_block(
freq_resp_result.imp_resp[:, i],
None,
reference,
compensate_fir_delay=not min_phase,
**filter,
)
# Clause 5.5/5.6: transform signals
freq, spectra, spectra_noise_est, wcf = calculate_spectrum(
[reference_flt, measurement, reference], fs, inactivity_fraction=inactivity_fraction,
ref_idx=2, **spectrum
spectra = NLMDict(
measurement=FftSpectrumVsTime.from_signals(measurement, axis=0, fs=fs, **spectrum),
linear=FftSpectrumVsTime.from_signals(reference_flt, axis=0, fs=fs, **spectrum),
)
# skip noise from reference (at list index 0)
spec_noise_est = spectra_noise_est[1]
# experimental feature: interpolate zeros in estimated noise spectrum to avoid singularities
if interpolate_zeros:
spec_noise_est = interpolate_spectrum(spec_noise_est, freq, 0.0)
# optional: to bands vs time
if (fraction := spectrum.get("fraction")) and (fraction > 0):
for key, spec in spectra.asdict().items():
if spec is not None: # and isinstance(spec, FftSpectrumVsTime):
spectra.set_value(key, spec.to_bands_vs_time(fraction=fraction))
# also interpolate in filtered reference and measurement
for i, spec in enumerate(spectra):
spectra[i] = interpolate_spectrum(spec, freq, 0.0)
freq = spectra.measurement.get_freqs()
# correct for FFT window function
wcf_lin = np.power(10, wcf/20)
spec_linear = spectra[0] * wcf_lin
spec_measurement = spectra[1] * wcf_lin
spec_noise_est = spec_noise_est * wcf_lin
# aggregate versus time
noise_percentile = spectrum.get("noise_percentile", 3.0)
spectra.set_value("noise", spectra.measurement.get_noise_estimate(percentile=noise_percentile))
for key in ["measurement", "linear"]:
spectra.set_value(key, spectra.get_value(key).get_aggregate())
# Clause 5.5/5.6:
# - ensure that linear estimate does not exceed measurement (minus noise estimate)
# - subtract estimates of linear and noise spectrum from measurement to obtain non-linearity spectrum
spec_linear = np.minimum(np.maximum(0.0, spec_measurement - spec_noise_est), spec_linear)
spec_nonlin = np.maximum(0.0, spec_measurement - spec_noise_est - spec_linear)
# experimental feature: interpolate zeros in non-linear spectrum to avoid singularities
if interpolate_zeros:
spec_nonlin = interpolate_spectrum(spec_nonlin, freq, 0.0)
spectra.linear = min(spectra.measurement - spectra.noise, spectra.linear)
spectra.nonlinear = spectra.measurement - spectra.noise - spectra.linear
# Clause 5.7: aggregate to single value vs frequency (from fmin to fmax)
idx_min = np.argmin(np.abs(freq - fmin))
idx_max = np.argmin(np.abs(freq - fmax))+1
# TODO: apply weighting to non-linear or estimated noise?
level_nl = 10 * np.log10(np.maximum(MIN_POWER_LIN, np.power(spec_nonlin[idx_min:idx_max], 2).sum(axis=0)))
level_lin = 10 * np.log10(np.maximum(MIN_POWER_LIN, np.power(spec_linear[idx_min:idx_max], 2).sum(axis=0)))
level_meas = 10 * np.log10(np.maximum(MIN_POWER_LIN, np.power(spec_measurement[idx_min:idx_max], 2).sum(axis=0)))
#level_meas_noise_comp = 10 * np.log10(np.power(spec_measurement_noise_comp[idx_min:idx_max], 2).sum(axis=0)))
level_noise = 10 * np.log10(np.maximum(MIN_POWER_LIN, np.power(spec_noise_est[idx_min:idx_max], 2).sum(axis=0)))
levels = NLMDict(None)
for name, spec in spectra.asdict().items():
# TODO: apply weighting to non-linear component or to estimated noise?
levels.set_value(name, spec.get_level("dB", include_window_correction=True))
spectra.set_value(
name,
np.maximum(spec.get_values("rms", include_window_correction=True), MIN_LEVEL_RMS),
)
# final NLM result:
# nlm = level_meas_noise_comp - level_nl
nlm = level_lin - level_nl
nlm = levels.linear - levels.nonlinear
# collect / return results
spectra = NLMDict(spec_measurement, spec_linear, spec_nonlin, spec_noise_est)
levels = NLMDict(level_meas, level_lin, level_nl, level_noise)
return NLMResult(nlm, freq=freq, spectra=spectra, levels=levels, fmin=fmin, fmax=fmax,
p56=p56_result, delay=delays, max_xcorr=max_xcorr, wcf=wcf, nbr_channels=nbr_channels, fs=fs,
freq_resp_result=freq_resp_result
return NLMResult(
nlm,
freq=freq,
spectra=spectra,
levels=levels,
fmin=fmin,
fmax=fmax,
p56=p56_result,
delay=delays,
max_xcorr=max_xcorr,
nbr_channels=nbr_channels,
fs=fs,
freq_resp_result=freq_resp_result,
)
if __name__ == "__main__":
pass
......@@ -5,11 +5,15 @@ Created on Jan 10 2025 09:17
@author: Jan.Reimes
"""
import pandas as pd
import logging
import warnings
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
import pandas as pd
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
logger = logging.getLogger("nonlinearity.debug")
if __name__ == "__main__":
pass
pass
......@@ -4,22 +4,32 @@ Created on Jan 10 2025 09:17
@author: Jan.Reimes
"""
import logging
from pathlib import Path
import pandas as pd
import numpy as np
import pandas as pd
from omegaconf import DictConfig
import locket
from .. import NLMResult, NLMDict
from .. import NLMResult
from .utils import get_sweep_overloads
IDX_COLS = ['file_measured', 'file_ref', 'channel']
DATA_COLS = ['NLM', 'p56.asl', 'p56.asl', 'delay', 'max_xcorr']
IDX_COLS = ["file_measured", "file_ref", "channel"]
DATA_COLS = ["NLM", "p56.asl", "p56.act", "delay", "max_xcorr"]
logger = logging.getLogger("nonlinearity.debug")
def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
if (cfg_debug := cfg.get('debug')) and (cfg_single_values := cfg_debug.get('single_values')):
precision = cfg_single_values.get('precision', 2)
if (cfg_debug := cfg.get("debug")) and (cfg_single_values := cfg_debug.get("single_values")):
try:
import locket
except ImportError:
logger.warning("Some packages are missing that are required for saving spectral data. Please run 'uv sync --extra debug' or 'uv sync --all-extras'.")
return
precision = cfg_single_values.get("precision", 2)
# determine index columns; add multi-run/sweep parameters to index, if available
# (exclude sweep parameters that are already in IDX_COLS
......@@ -28,13 +38,13 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
idx_cols += list(sweep_params.keys())
sweep_key = tuple(sweep_params.values())
single_value_file = Path(cfg_single_values.get('single_values_file', 'nlm_single_values.xlsx'))
single_value_file = Path(cfg_single_values.get("single_values_file", "nlm_single_values.xlsx"))
if not single_value_file.is_absolute():
output_dir = Path(cfg['paths']['output_dir']).absolute()
output_dir = Path(cfg["paths"]["output_dir"]).absolute()
single_value_file = Path(output_dir / single_value_file).resolve()
# protect against concurrent write with semaphore/lock
with locket.lock_file(single_value_file.with_suffix('.lock')):
with locket.lock_file(single_value_file.with_suffix(".lock")):
# data storage - load existing (in case of e.g. multiple runs or special output file)
if single_value_file.is_file():
df = pd.read_excel(single_value_file, index_col=np.arange(len(idx_cols)).tolist()).infer_objects()
......@@ -42,21 +52,23 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
df = pd.DataFrame(columns=idx_cols + DATA_COLS).set_index(idx_cols)
for ch_idx in range(results.nlm.shape[0]):
key = (cfg['file_measured'].name, cfg['file_ref'].name, ch_idx+1)
key = (cfg["file_measured"].name, cfg["file_ref"].name, ch_idx + 1)
key = key + sweep_key
df.loc[key, 'NLM'] = np.round(results.nlm[ch_idx], precision)
df.loc[key, "NLM"] = np.round(results.nlm[ch_idx], precision)
for name, values in results.levels.asdict().items():
df.loc[key, f'level.{name}'] = np.round(values[ch_idx], precision)
df.loc[key, f"level.{name}"] = np.round(values[ch_idx], precision)
# other values
df.loc[key, 'delay'] = np.round(results.delay[ch_idx], precision)
df.loc[key, 'max_xcorr'] = np.round(results.max_xcorr[ch_idx], precision)
df.loc[key, 'p56.act'] = np.round(results.p56.act, precision)
df.loc[key, 'p56.asl'] = np.round(results.p56.asl, precision)
df.loc[key, "delay"] = np.round(results.delay[ch_idx], precision)
df.loc[key, "max_xcorr"] = np.round(results.max_xcorr[ch_idx], precision)
df.loc[key, "p56.act"] = np.round(results.p56.act, precision)
df.loc[key, "p56.asl"] = np.round(results.p56.asl, precision)
df.sort_index(inplace=True)
df.to_excel(single_value_file, merge_cells=cfg_single_values.get('merge_cells', False))
df.to_excel(
single_value_file,
merge_cells=cfg_single_values.get("merge_cells", False),
)
if __name__ == "__main__":
......
......@@ -4,50 +4,74 @@ Created on Jan 10 2025 09:36
@author: Jan.Reimes
"""
import logging
from pathlib import Path
import numpy as np
from omegaconf import DictConfig
import h5py
import locket
from .. import NLMResult, NLMDict, MIN_LEVEL_LIN
from .utils import get_sweep_overloads, convert_hdf5_compatible
from .. import MIN_LEVEL_RMS, NLMResult
from .utils import convert_dict_to_hdf5_compatible, get_sweep_overloads, is_hydra_environment
IDX_COLS = ['file_measured', 'file_ref']
IDX_COLS = ["file_measured", "file_ref"]
logger = logging.getLogger("nonlinearity.debug")
def save_spectral_data(results: NLMResult, cfg: DictConfig) -> None:
if (cfg_debug := cfg.get('debug')) and (cfg_spectrum := cfg_debug.get('spectral_data')):
if (cfg_debug := cfg.get("debug")) and (cfg_spectrum := cfg_debug.get("spectral_data")):
try:
import h5py
import locket
except ImportError:
logger.warning("Some packages are missing that are required for saving spectral data. Please run 'uv sync --extra debug' or 'uv sync --all-extras'.")
return
# get metadata for sweep, including file names
sweep_params = {k: cfg[k] for k in IDX_COLS}
sweep_params.update(get_sweep_overloads(cfg, include_params=sweep_params, hdf5_compatible=True))
if is_hydra_environment():
sweep_params.update(get_sweep_overloads(cfg, include_params=sweep_params))
sweep_params = convert_dict_to_hdf5_compatible(sweep_params)
spectral_data_output_file = Path(cfg_spectrum.get('spectral_data_file', 'spectral_data.h5'))
spectral_data_output_file = Path(cfg_spectrum.get("spectral_data_file", "spectral_data.h5"))
if not spectral_data_output_file.is_absolute():
output_dir = Path(cfg['paths']['output_dir']).absolute()
output_dir = Path(cfg["paths"]["output_dir"]).absolute()
spectral_data_output_file = Path(output_dir / spectral_data_output_file).resolve()
# protect against concurrent write with semaphore/lock
with locket.lock_file(spectral_data_output_file.with_suffix('.lock')):
with locket.lock_file(spectral_data_output_file.with_suffix(".lock")):
# data storage - load existing (in case of e.g. multiple runs or special output file)
with h5py.File(spectral_data_output_file, 'a') as f:
with h5py.File(spectral_data_output_file, "a") as f:
# base group
g = f.require_group(cfg['file_measured'].name).require_group(cfg['file_ref'].name)
g = f.require_group(cfg["file_measured"].name).require_group(cfg["file_ref"].name)
# store possible sweep runs with index
next_key = len(g.keys())
g = g.require_group(f'{next_key}')
g = g.require_group(f"{next_key}")
# store sweep parameters as metadata
g.attrs.update(sweep_params)
# store frequency vector
g.require_dataset('freq', data=results.freq, dtype=results.freq.dtype, shape=results.freq.shape, chunks=True)
g.require_dataset(
"freq",
data=results.freq,
dtype=results.freq.dtype,
shape=results.freq.shape,
chunks=True,
)
# iterate over spectrum types
for name, spectrum in results.spectra.asdict().items():
if cfg_spectrum.get('in_db', False):
spectrum = 20 * np.log10(np.maximum(spectrum, MIN_LEVEL_LIN))
g.require_dataset(name, data=spectrum, dtype=spectrum.dtype, shape=spectrum.shape, chunks=True)
if cfg_spectrum.get("in_db", False):
spectrum = 20 * np.log10(np.maximum(spectrum, MIN_LEVEL_RMS))
g.require_dataset(
name,
data=spectrum,
dtype=spectrum.dtype,
shape=spectrum.shape,
chunks=True,
)
if __name__ == "__main__":
......
......@@ -4,32 +4,33 @@ Created on Jan 10 2025 16:13
@author: Jan.Reimes
"""
from types import NoneType
from typing import Any, Dict, Iterable, List, Optional
import numpy as np
from pathlib import Path
from typing import Dict, Any, Optional, List, Iterable
from hydra.core.hydra_config import HydraConf, HydraConfig
from omegaconf import DictConfig, OmegaConf
from hydra.core.hydra_config import HydraConfig, HydraConf
_CONVERT_DTYPES = {
np.dtype('O'): str,
np.dtype('bool'): np.uint8,
NoneType: str
}
from . import logger
_CONVERT_DTYPES = {np.dtype("O"): str, np.dtype("bool"): np.uint8, NoneType: str}
def is_hydra_environment() -> bool:
"""Check if the current environment is a Hydra environment."""
return HydraConfig.instance().cfg is not None # and HydraConfig.instance().job is not None
def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = None,
include_params: Optional[Dict[str, Any]] = None,
hdf5_compatible: bool = False) -> Dict[str, Any]:
def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = None, include_params: Optional[Dict[str, Any]] = None, hdf5_compatible: bool = False) -> Dict[str, Any]:
exclude_params = exclude_params or []
overloads = (include_params or {}).copy()
if is_hydra_environment():
hydra_cfg: HydraConf = HydraConfig.get()
if (sweep_config := hydra_cfg.get('sweeper')) is not None:
if (sweep_config := hydra_cfg.get("sweeper")) is not None:
# read sweep parameters from these keys:
for attr in ['grid_params', 'list_params', 'params']:
for attr in ["grid_params", "list_params", "params"]:
if sweep_params := sweep_config.get(attr):
# get key value from hydra-sweep-config and value from actual config
for p in sweep_params.keys():
......@@ -37,13 +38,21 @@ def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = N
if p not in exclude_params:
overloads[p] = OmegaConf.select(cfg, p)
else:
logger.warning("No HydraConfig found, sweep parameters not available.")
if hdf5_compatible:
# convert (only) values to hdf5 compatible types
overloads = dict(zip(overloads.keys(), convert_hdf5_compatible(overloads.values())))
overloads = convert_dict_to_hdf5_compatible(overloads)
return overloads
def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
def convert_dict_to_hdf5_compatible(d: Dict[Any, Any]) -> List[Any]:
# convert (only) values to hdf5 compatible types
return dict(zip(d.keys(), convert_values_to_hdf5_compatible(d.values())))
def convert_values_to_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
converted = []
for v in values:
if fn := _CONVERT_DTYPES.get(type(v)):
......@@ -56,5 +65,6 @@ def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
return converted
if __name__ == "__main__":
pass
......@@ -13,14 +13,14 @@ from numpy.typing import NDArray
from scipy.signal import freqz
from ensure import Check
from . import MIN_LEVEL_DB, MIN_POWER_LIN, FreqRespResult
from .spectrum import get_active_frames, csd_vs_time, fft_to_octave_bands
from .utils.misc import check_spectrum_arguments
from . import MIN_LEVEL_DB, MIN_LEVEL_POW, MIN_LEVEL_RMS, FreqRespResult
from .spectrum import FftSpectrumVsTime, AggregateFn, check_spectrum_arguments # get_active_frames, csd_vs_time, fft_to_octave_bands
#from .utils.misc import check_spectrum_arguments
log = logging.getLogger(__name__)
MIN_COHERENCE_RATIO = 0.25
NDIntArray = NDArray[int]
NDIntArray = NDArray[np.int64]
def smooth_impulse_response(h: NDArray, fs: int, fraction: int, nfft: Optional[int] = None,
......@@ -60,9 +60,12 @@ def estimate_ir_csd(y: NDArray, x: NDArray, fs: int, ntaps: int = 512, min_coher
noverlap: Optional[int] = None, nfft: Optional[int] = None,
fmin: Optional[float] = None, fmax: Optional[float] = None,
inactivity_fraction: Optional[float] = None,
agg_fn: Optional[AggregateFn] = None,
**kwargs) -> FreqRespResult: # Tuple[NDArray, bool, NDArray, NDArray]:
# check input arguments
axis: int = kwargs.pop('axis', 0)
agg_fn = agg_fn or 'mean'
direct_irfft: bool = kwargs.get('direct_irfft', False)
min_phase: bool = kwargs.get('min_phase', False)
fraction: int = kwargs.pop('fraction', None) or -1
......@@ -81,19 +84,37 @@ def estimate_ir_csd(y: NDArray, x: NDArray, fs: int, ntaps: int = 512, min_coher
ntaps_fir = ntaps + 1 if ntaps % 2 == 0 else ntaps
# auto/cross power spectra vs time
f, t, pxy = csd_vs_time(x, y, **spec_args)
_, _, pxx = csd_vs_time(x, x, **spec_args)
_, _, pyy = csd_vs_time(y, y, **spec_args)
spec_args = check_spectrum_arguments(fs, window, nperseg, noverlap, nfft, fmin=fmin, fmax=fmax, **kwargs)
spec_xy = FftSpectrumVsTime.from_signals(x, y=y, axis=axis, **spec_args)
spec_xx = FftSpectrumVsTime.from_signals(x, y=None, axis=axis, **spec_args)
spec_yy = FftSpectrumVsTime.from_signals(y, y=None, axis=axis, **spec_args)
t, f = spec_xy.time, spec_xy.get_freqs(all_freqs=True)
#f, t, pxy = csd_vs_time(x, y, **spec_args)
#_, _, pxx = csd_vs_time(x, x, **spec_args)
#_, _, pyy = csd_vs_time(y, y, **spec_args)
# check active frames -> all frames above inactivity-percentile
idx_active = np.ones_like(t, dtype=bool)
#idx_active = np.ones_like(t, dtype=bool)
if inactivity_fraction is not None:
idx_active = get_active_frames(x, inactivity_fraction, **spec_args)
idx_active = spec_xx.get_level_vs_time().get_active_frames(inactivity_fraction, weighting=None)
spec_xx = spec_xx.with_active_frames(idx_active)
spec_yy = spec_yy.with_active_frames(idx_active)
spec_xy = spec_xy.with_active_frames(idx_active)
#idx_active = spec_xx.get_level_vs_time().get_active_frames(inactivity_fraction, weighting=None)
#idx_active = get_active_frames(x, inactivity_fraction, **spec_args)
# average spectrum over (active) time
pxy = spec_xy.get_aggregate(agg_fn).get_values('pow', all_freqs=True)
pxx = spec_xx.get_aggregate(agg_fn).get_values('pow', all_freqs=True)
pyy = spec_yy.get_aggregate(agg_fn).get_values('pow', all_freqs=True)
# average spectrum over time
pxy = pxy[:, idx_active].mean(axis=-1)
pxx = pxx[:, idx_active].mean(axis=-1)
pyy = pyy[:, idx_active].mean(axis=-1)
#pxy = pxy[:, idx_active].mean(axis=-1)
#pxx = pxx[:, idx_active].mean(axis=-1)
#pyy = pyy[:, idx_active].mean(axis=-1)
# limit frequency range
idx_fmin = np.argmin(np.abs(f - fmin))
......@@ -110,11 +131,13 @@ def estimate_ir_csd(y: NDArray, x: NDArray, fs: int, ntaps: int = 512, min_coher
weighting[idx_fmax:] = np.linspace(1.0, 0.0, f.shape[0] - idx_fmax)
# transfer function
H1 = np.divide(pxy, np.maximum(pxx, MIN_POWER_LIN))
H1 = np.divide(pxy, np.maximum(pxx, MIN_LEVEL_POW))
H1 *= weighting
#H1 = np.sqrt(H1)
# check coherence within frequency range
cxy = np.abs(pxy) ** 2 / np.maximum(pxx * pyy, MIN_POWER_LIN ** 2) # identical to scipy.signal.coherence(y,x)
#cxy = np.abs(pxy) ** 2 / np.maximum(pxx * pyy, MIN_LEVEL_POW) # identical to scipy.signal.coherence(y,x)
cxy = np.abs(pxy) / np.maximum(np.sqrt(pxx*pyy), MIN_LEVEL_POW) # identical to scipy.signal.coherence(y,x)
idx_coherent = cxy >= min_coherence
idx_coherent_f = idx_coherent & idx_f
......
# -*- coding: utf-8 -*-
"""
Created on Sept 09 2024 17:55
from .types import (
ISpectrumVsTime, ISpectrumAvg, ILevelVsTime, AggregateFn, Weighting,
ValueRepresentation, TransformFn
)
from .misc import get_value_representation, to_db
from .impl_level_vs_time import LevelVsTime
from .impl_fft import FftSpectrumAvg, FftSpectrumVsTime
from .impl_bands import BandSpectrumVsTime, BandSpectrumAvg
from .spectral_args import SpectralArguments, check_spectrum_arguments
__all__ = [
'ISpectrumVsTime',
'ISpectrumAvg',
'ILevelVsTime',
'LevelVsTime',
'AggregateFn',
'Weighting',
'ValueRepresentation',
'TransformFn',
'get_value_representation',
'to_db',
'FftSpectrumAvg',
'FftSpectrumVsTime',
'BandSpectrumVsTime',
'BandSpectrumAvg',
'SpectralArguments',
'check_spectrum_arguments',
]
@author: Jan.Reimes
"""
import numpy as np
from numpy.typing import NDArray
from typing import Tuple, Optional, Iterable, List, Union, Any
from scipy.signal import spectrogram, welch, get_window
from scipy.signal._spectral_py import _spectral_helper
from . import MIN_POWER_LIN, MIN_LEVEL_DB
from .utils.misc import check_spectrum_arguments
def calculate_level(x: NDArray, fs: int, fmin: Optional[float] = None, fmax: Optional[float] = None,
window: str = 'boxcar', nperseg: Optional[int] = 8192, noverlap: Optional[int] = None,
window: str = 'boxcar', nperseg: int = 8192, noverlap: Optional[int] = None,
overlap: Optional[float] = None, nfft: Optional[int] = None, average: Optional[str] = 'mean',
axis: int = 0, **kwargs
) -> float|NDArray:
......@@ -46,8 +60,9 @@ def calculate_level(x: NDArray, fs: int, fmin: Optional[float] = None, fmax: Opt
level = 10 * np.log10(np.sum(pxx[idx_min:idx_max+1])) - wcf
return level
"""
'''
def fft_to_octave_bands(
fft_data: NDArray,
fs: int,
......@@ -145,44 +160,48 @@ def fft_to_octave_bands(
return band_center_freqs, octave_band_levels
'''
'''
def calculate_spectrum_vs_time(x: NDArray, fs: int, window: str = 'flattop', nperseg: Optional[int] = None,
noverlap: Optional[int] = None, nfft: Optional[int] = None,
fmin: Optional[float] = None, fmax: Optional[float] = None,
fraction: Optional[int] = None, is_transfer_function: bool = False, **kwargs
) -> Tuple[NDArray, NDArray, NDArray, float]:
fraction: Optional[int] = None, is_transfer_function: bool = False,
include_window_correction: bool = True, axis: int = -1, **kwargs
) -> Tuple[NDArray, NDArray, NDArray]:
# check fmin/fmax
fmin = fmin or 0.0
fmax = fmax or fs / 2
axis = axis if axis >= 0 else len(x.shape) + axis # real axis index
mode = kwargs.pop('mode', 'psd')
scaling = kwargs.pop('scaling', 'spectrum')
# check spectrum arguments
spec_args = check_spectrum_arguments(fs, window, nperseg, noverlap, nfft, **kwargs)
nperseg = spec_args['nperseg']
# reshape: spectrogram function expects signals as: channels x time
x = x.T if len(x.shape) > 1 else x[np.newaxis, :]
if x.shape[1] < nperseg:
#x = x.T if len(x.shape) > 1 else x[np.newaxis, :]
x = np.moveaxis(x, axis, 0)
if (d := (nperseg-x.shape[0])) > 0: # x.shape[axis] < nperseg:
# zero pad for small signals
x = np.pad(x, ((0,0), (0, nperseg-x.shape[1])))
x = np.pad(x, (0, d))
# window correction factor (energy loss due to windowing)
win: NDArray = spec_args['window']
wcf = 10 * np.log10(np.sum(win ** 2) / win.shape[0]) - 20 * np.log10(np.abs(win).mean())
# use STFT for calculating average and estimating noise spectrum
freq, time, spec = spectrogram(x,
freq, time, spec = _spectral_helper(x, x,
detrend='constant',
mode=mode,
axis=-1,
scaling=scaling,
axis=0,
**spec_args
)
# re-order: frequencies x time x channels
spec = np.abs(np.moveaxis(spec, 0, -1))
if fraction is not None and fraction >= 1:
freq, spec_bands = fft_to_octave_bands(spec, fs,
is_transfer_function=is_transfer_function, fraction=fraction, axis=0
......@@ -190,53 +209,60 @@ def calculate_spectrum_vs_time(x: NDArray, fs: int, window: str = 'flattop', npe
else:
spec_bands = spec
# limit to fmin/fmax
idx_min = np.argmin(np.abs(freq - fmin))
idx_max = np.argmin(np.abs(freq - fmax))
freq = freq[idx_min:idx_max+1]
spec_bands = spec_bands[idx_min:idx_max+1, :]
return freq, time, spec_bands, wcf
# re-order: frequencies x time x channels
spec_bands = np.abs(np.moveaxis(spec_bands, 0, axis))
return freq, time, spec_bands
'''
'''
def calculate_spectrum(signals: Union[Iterable[NDArray], NDArray], fs: int,
window: str = 'flattop', nperseg: Optional[int] = None, noverlap: Optional[int] = None,
nfft: Optional[int] = None, fmin: Optional[float] = None, fmax: Optional[float] = None,
fraction: Optional[int] = None, noise_percentile: float = 3.0,
inactivity_fraction: Optional[float] = None, ref_idx: int = 0,
**kwargs) -> Tuple[NDArray, List[NDArray], List[NDArray], float]:
include_window_correction: bool = True, axis: int = 0,
**kwargs) -> Tuple[NDArray, List[NDArray], List[NDArray]]:
# init output variables
spec_avg_output = []
noise_est_output = []
freq, wcf = None, 0.0 # avoid warnings
freq = None # avoid warnings
# spectrum parameters will be checked in calculate_spectrum_vs_time(), only collect here
spec_args = dict(fs=fs, window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft,
fmin=fmin, fmax=fmax, overlap=kwargs.pop("overlap", None))
fmin=fmin, fmax=fmax, overlap=kwargs.pop("overlap", None), mode=kwargs.pop('mode', 'psd'),
scaling=kwargs.pop('scaling', 'spectrum'), fraction=fraction
)
if single_array := isinstance(signals, np.ndarray):
signals = [signals]
# inactivity_fraction provided: use reference signal for activity detection
if inactivity_fraction:
idx_active = get_active_frames(signals[ref_idx], inactivity_fraction, **spec_args)
else:
idx_active = None
#if inactivity_fraction:
# idx_active = get_active_frames(signals[ref_idx], inactivity_fraction, **spec_args)
# add fraction parameter after active frame detection
spec_args['fraction'] = fraction
for x in signals:
freq, time, spec_bands, wcf = calculate_spectrum_vs_time(x, **spec_args, **kwargs)
freq, time, spec_bands = calculate_spectrum_vs_time(x, axis=axis, **spec_args, **kwargs)
# apply active frames?
if idx_active is not None:
spec_bands = spec_bands[:, idx_active]
# average vs time & RMS domain
spec_bands_avg = np.sqrt(spec_bands).mean(axis=1)
noise_est = np.percentile(np.sqrt(spec_bands), noise_percentile, axis=1)
spec_bands_avg = np.sqrt(spec_bands.mean(axis=-1))
noise_est = np.sqrt(np.percentile(spec_bands, noise_percentile, axis=-1))
# report
spec_avg_output.append(spec_bands_avg)
......@@ -244,25 +270,26 @@ def calculate_spectrum(signals: Union[Iterable[NDArray], NDArray], fs: int,
if single_array:
# if input was not a list, only return array
return freq, spec_avg_output[0], noise_est_output[0], wcf
return freq, spec_avg_output[0], noise_est_output[0]
else:
return freq, spec_avg_output, noise_est_output, wcf
return freq, spec_avg_output, noise_est_output
'''
'''
def csd_vs_time(x: NDArray, y:NDArray, **spec_args) -> Tuple[NDArray, NDArray, NDArray]:
# helper function to calculate cross spectral density vs time (currently not supported by scipy.signal)
spec_args = check_spectrum_arguments(**spec_args)
freqs, time, pxy = _spectral_helper(x, y, mode='psd', **spec_args)
return freqs, time, pxy
'''
# def get_active_frames(x: NDArray, inactivity_fraction: Optional[float] = None, **spec_args) -> NDArray:
# # check active frames -> all frames above inactivity-percentile
# lxx = level_vs_time(x, in_db=True, **spec_args)
# activity_threshold_db = np.percentile(lxx, 100 * inactivity_fraction)
# return lxx >= activity_threshold_db
def get_active_frames(x: NDArray, inactivity_fraction: Optional[float] = None, **spec_args) -> NDArray:
# check active frames -> all frames above inactivity-percentile
lxx = level_vs_time(x, in_db=True, **spec_args)
activity_threshold_db = np.percentile(lxx, 100 * inactivity_fraction)
return lxx >= activity_threshold_db
"""
def level_vs_time(
x: NDArray, fmin: float = None, fmax: float = None, in_db: bool = True, **spec_args
) -> NDArray:
......@@ -281,7 +308,7 @@ def level_vs_time(
return 10 * np.log10(np.maximum(lxx, MIN_POWER_LIN))
else:
return lxx
"""
if __name__ == "__main__":
pass
"""
Created on: 06 Jun 2025 - 17:04
Author: jan.reimes@head-acoustics.com
Abstract base class for spectrum averaging functionality.
Provides aggregation of time-varying spectrum data using configurable functions.
Supports various value representations, weightings, and level calculations.
"""
import numpy as np
from numpy.typing import NDArray
from functools import partial, total_ordering
from abc import ABC
from typing import Callable, TypeAlias, Optional, Self
from .misc import get_value_representation
from .spectral_args import SpectralArguments
from .types import ISpectrumVsTime, ISpectrumAvg, AggregateFn, Weighting, ValueRepresentation, TransformFn, Ax
@total_ordering
class SpectrumAvgBase(ABC, ISpectrumAvg):
_ref_spec_vs_time: ISpectrumVsTime
_aggregate: AggregateFn = 'mean'
def __init__(self, ref_spec_vs_time: ISpectrumVsTime, aggregate: AggregateFn = 'mean', **kwargs):
"""
Initialize the SpectrumAvg with a reference SpectrumVsTime instance and an aggregation function.
:param ref_spec_vs_time: The reference SpectrumVsTime instance to aggregate.
:param aggregate: The aggregation function to use (default is 'mean').
"""
self._ref_spec_vs_time = ref_spec_vs_time
self._aggregate = aggregate
@property
def x(self) -> NDArray:
return self._ref_spec_vs_time.x
@property
def y(self) -> NDArray:
return self._ref_spec_vs_time.y
@property
def fs(self) -> int:
return self._ref_spec_vs_time.fs
@property
def is_2d(self) -> bool:
return self._ref_spec_vs_time.is_2d
@property
def spectrum_args(self) -> SpectralArguments:
"""
Return the spectral arguments used for the (internal) spectrum.
"""
return self._ref_spec_vs_time.spectrum_args
def get_freqs(self, all_freqs: bool = False) -> NDArray:
return self._ref_spec_vs_time.get_freqs(all_freqs)
def get_values(self, representation: ValueRepresentation = 'rms', weighting: Weighting = None, all_freqs: bool = False,
value_transform: Optional[TransformFn] = None, include_window_correction: bool = True) -> NDArray:
# use value_transform instead of default aggregate function (-> self._aggregate)
if value_transform is not None:
agg_fn = value_transform
else:
if isinstance(self._aggregate, str):
agg_fn = getattr(np, self._aggregate) # ensure aggregate method exists
agg_fn = partial(agg_fn, axis=Ax.TIME)
else:
agg_fn = self._aggregate
# ToDo: check if agg_fn has an argument 'axis' and set it to Ax.TIME (?)
return self._ref_spec_vs_time.get_values(representation, weighting, all_freqs, value_transform=agg_fn,
include_window_correction=include_window_correction)
def get_level(self, representation: ValueRepresentation = 'dB', weighting: Weighting = None, all_freqs: bool = False,
include_window_correction: bool = True) -> float|NDArray:
"""
Calculate the level of the spectrum.
Args:
weighting (Weighting): Optional weighting to apply (e.g., 'A', 'B', 'C', 'Z').
in_db (bool): If True, return level in dB, otherwise in linear scale.
"""
v = self.get_values(self._int_vr, weighting, all_freqs, include_window_correction=include_window_correction)
return get_value_representation(v.sum(axis=Ax.FREQ), representation, self._int_vr)
def __add__(self, other: Self) -> 'SpectrumAvgComposed':
"""Add two spectrum averages."""
return SpectrumAvgComposed(self, other, np.add, internal_representation='pow')
def __sub__(self, other: Self) -> 'SpectrumAvgComposed':
"""Subtract two spectrum averages."""
def _subtract(a: NDArray, b: NDArray) -> NDArray:
"""Subtract two arrays element-wise."""
return np.maximum(a - b, 0.0) # Ensure no negative values
return SpectrumAvgComposed(self, other, _subtract, internal_representation='pow')
def __truediv__(self, other: Self) -> 'SpectrumAvgComposed':
"""Divide two spectrum averages."""
def _divide(a: NDArray, b: NDArray, div_epsilon=1e-10) -> NDArray:
"""Divide two arrays element-wise."""
return np.divide(a, np.maximum(b, div_epsilon)) # Avoid division by zero
return SpectrumAvgComposed(self, other, _divide, internal_representation='rms')
def __le__(self, other: Self) -> 'SpectrumAvgComposed':
"""Less than or equal comparison for spectrum averages."""
def _less_equal(a: NDArray, b: NDArray) -> NDArray:
"""Element-wise less than or equal comparison."""
return np.less_equal(a, b)
return SpectrumAvgComposed(self, other, _less_equal, internal_representation='pow')
ComposeOp: TypeAlias = Callable[[NDArray, NDArray], NDArray]
class SpectrumAvgComposed(SpectrumAvgBase):
"""
Composed spectrum average that combines two spectrum averages using a mathematical operation.
"""
_left: SpectrumAvgBase
_right: SpectrumAvgBase
_operation: ComposeOp
_internal_representation: ValueRepresentation
def __init__(self, left: SpectrumAvgBase, right: SpectrumAvgBase, operation: ComposeOp, internal_representation: ValueRepresentation = 'rms'):
"""
Initialize with two spectrum averages and an operation to combine them.
Args:
left: Left operand spectrum average
right: Right operand spectrum average
operation: Function to combine the values (e.g., lambda a, b: a + b)
"""
# Validate compatibility of frequency bins, time, fs, axis, and spectrum arguments
if left.fs != right.fs:
raise ValueError("Spectrum averages must have matching sampling frequencies (fs)")
if left.spectrum_args != right.spectrum_args:
raise ValueError("Spectrum averages must have matching spectral arguments")
if not np.array_equal(left.get_freqs(), right.get_freqs()):
raise ValueError("Spectrum averages must have matching frequency bins")
if not np.array_equal(left._ref_spec_vs_time.time, right._ref_spec_vs_time.time):
raise ValueError("Spectrum averages must have matching time vectors")
# Use the left operand as the reference for properties
self._left = left
self._right = right
self._operation = operation
self._internal_representation = internal_representation
super().__init__(left._ref_spec_vs_time, left._aggregate)
@property
def _int_vr(self) -> ValueRepresentation:
# override this otherwise constant value only for composed spectra
return self._internal_representation
def _lazy_initialize(self) -> bool:
return self._left._lazy_initialize() and self._right._lazy_initialize()
def get_values(self, representation: ValueRepresentation = 'rms', weighting: Weighting = None, all_freqs: bool = False,
value_transform: Optional[TransformFn] = None, include_window_correction: bool = True) -> NDArray:
"""Get values by applying the operation to both operands."""
left_values = self._left.get_values(self._int_vr, weighting, all_freqs, value_transform=value_transform,
include_window_correction=include_window_correction)
right_values = self._right.get_values(self._int_vr, weighting, all_freqs, value_transform=value_transform,
include_window_correction=include_window_correction)
return get_value_representation(self._operation(left_values, right_values), representation, self._int_vr)
if __name__ == "__main__":
pass
\ No newline at end of file
"""
Created on: 11 Jun 2025 - 13:57
Author: jan.reimes@head-acoustics.com
Abstract base class for time-varying spectrum functionality.
Provides spectrum analysis over time with frequency filtering, weighting, and aggregation.
Supports noise estimation, level calculation, and frame-based processing.
"""
import numpy as np
from typing import Optional, Tuple, Self
from numpy.typing import NDArray
from functools import partial
from copy import deepcopy
from abc import ABC
from .impl_level_vs_time import LevelVsTime
from .misc import get_value_representation
from .spectral_args import SpectralArguments
from .types import ISpectrumVsTime, ISpectrumAvg, ILevelVsTime, Weighting, ValueRepresentation, TransformFn, Ax
from .weighting import get_weighting
class SpectrumVsTimeBase(ABC, ISpectrumVsTime):
_x: NDArray
_y: NDArray
_spectrum_args: SpectralArguments
_freq: Optional[NDArray] = None
_time: Optional[NDArray] = None
_values: Optional[NDArray] = None
_active_frames: Optional[NDArray[np.bool_]] = None
@property
def x(self) -> NDArray:
return self._x
@property
def y(self) -> NDArray:
return self._y
@property
def is_2d(self) -> bool:
return self._x.ndim == 2
@property
def fs(self) -> int:
return round(self.spectrum_args.fs)
@property
def time(self) -> NDArray:
self._lazy_initialize()
if self._time is None:
return np.empty(0, dtype=np.float64)
else:
return self._time
def get_freqs(self, all_freqs: bool = False) -> NDArray:
f_idx_min, f_idx_max = self._get_freq_indices(all_freqs)
if self._freq is None:
return np.empty(0, dtype=np.float64)
else:
return self._freq[f_idx_min:f_idx_max]
@property
def spectrum_args(self) -> SpectralArguments:
return self._spectrum_args
def _get_freq_indices(self, all_freqs: bool = False) -> Tuple[int, int]:
"""
Get the indices for the frequency range based on fmin and fmax.
"""
self._lazy_initialize()
idx_min, idx_max = 0, -1
if isinstance(self._freq, np.ndarray):
if all_freqs:
idx_min = 0
idx_max = self._freq.shape[0]
else:
fmin = self.spectrum_args.fmin or 0
fmax = self.spectrum_args.fmax or -1
idx_min = np.argmin(np.abs(self._freq - fmin))
idx_max = np.argmin(np.abs(self._freq - fmax))+1
return int(idx_min), int(idx_max)
def _lazy_initialize(self) -> bool: ...
def get_values(self, representation: ValueRepresentation = 'rms', weighting: Weighting = None, all_freqs: bool = False,
value_transform: Optional[TransformFn] = None, include_window_correction: bool = True) -> NDArray:
self._lazy_initialize()
# limit in frequency range
f_idx_min, f_idx_max = self._get_freq_indices(all_freqs)
v = self._values[f_idx_min:f_idx_max]
# limit time frames
v = v[:, self._active_frames] if (self._active_frames is not None) else v
# apply weighting if specified
w = get_weighting(self.get_freqs(all_freqs), weighting, self._int_vr)
w = np.expand_dims(w, axis=list(range(len(v.shape)))[1:])
v_w = v * w
# apply transformation if specified, like e.g., aggregation for noise estimation/avergage vs time
if value_transform is not None:
v_w = value_transform(v_w)
# apply window correction if specified
if include_window_correction:
v_w *= self.spectrum_args.get_window_correction(self._int_vr)
return get_value_representation(v_w, representation, self._int_vr)
def get_noise_estimate(self, percentile: float = 5.0) -> ISpectrumAvg:
"""
Estimate noise level from the spectrum.
Args:
percentile (float): Percentile to use for noise estimation.
Returns:
np.ndarray: Estimated noise level in dB.
"""
return self.get_aggregate(aggregate=partial(np.percentile, q=percentile, axis=Ax.TIME))
def get_level_vs_time(self) -> ILevelVsTime:
"""
Calculate the level of the spectrum over time.
Args:
weighting (Weighting): Optional weighting to apply.
in_db (bool): If True, return level in dB, otherwise in linear scale.
Returns:
NDArray: Level of the spectrum over time.
"""
return LevelVsTime(self)
def with_active_frames(self, idx_active: NDArray[np.bool_], pad_value: bool = False) -> Self:
self._lazy_initialize()
idx_active = np.array(idx_active, dtype=np.bool_)
if idx_active.shape[0] > self.time.shape[0]:
# crop
idx_active = idx_active[:self.time.shape[0]]
elif idx_active.shape[0] < self.time.shape[0]:
# pad with True/False
idx_active_base = np.ones_like(self.time, dtype=np.bool_) if pad_value else np.zeros_like(self.time, dtype=np.bool_)
idx_active_base[:idx_active.shape[0]] = idx_active
idx_active = idx_active_base
# make copy & patch active frames object
active_spec_vs_time = deepcopy(self)
active_spec_vs_time._active_frames = idx_active
return active_spec_vs_time
if __name__ == "__main__":
pass
\ No newline at end of file
"""
Created on: 08 May 2025 - 20:38
Author: jan.reimes@head-acoustics.com
"""
from typing import LiteralString, Dict, Optional, Any, Protocol, Type, Tuple
import numpy as np
import scipy.interpolate as si
from numpy.typing import NDArray
from acoustics.standards.iec_61260_1_2014 import lower_frequency, upper_frequency, index_of_frequency
from acoustics.standards.iec_61260_1_2014 import nominal_center_frequency, exact_center_frequency
from .types import ValueRepresentation, FrequencyBands
from .misc import get_value_representation
class _Interpolator(Protocol):
def __init__(self, x: NDArray, y: NDArray, **kwargs: Any) -> None: ...
def __call__(self, xf: NDArray) -> NDArray: ...
InterpolatorCls = Type[_Interpolator]
def get_bands_iec_61260_1_2014(fraction: int, fmin: float = 20.0, fmax: float = 20_000.0) -> FrequencyBands:
start_idx = index_of_frequency(fmin, fraction=fraction)
end_idx = index_of_frequency(fmax, fraction=fraction)
exact_center_freqs = [exact_center_frequency(i, fraction=fraction) for i in range(start_idx, end_idx + 1)]
center_freqs = [nominal_center_frequency(f_e, fraction=fraction) for f_e in exact_center_freqs]
lower_freqs = [lower_frequency(f_e, fraction=fraction) for f_e in center_freqs]
upper_freqs = [upper_frequency(f_e, fraction=fraction) for f_e in center_freqs]
return FrequencyBands.from_center_lower_upper_freqs(center_freqs, lower_freqs, upper_freqs,
title=f"IEC 61260-1:2014 1/{fraction}-octave bands"
)
def fft_to_octave_bands(
fft_data: NDArray,
fs: int,
bands: FrequencyBands,
input_domain: ValueRepresentation = 'rms',
output_domain: ValueRepresentation = 'pow',
is_transfer_function: bool = False,
clip_output_db: Optional[float] = None,
axis: int = 0,
interpolator: LiteralString = 'akima1d',
interpolator_kwargs: Optional[Dict[str, Any]] = None,
) -> Tuple[NDArray, NDArray]:
# determine the interpolator
interpolator_kwargs = interpolator_kwargs or {}
all_interpolators = [intp.lower() for intp in si.__all__]
idx = -1 if (intp := f"{interpolator}Interpolator".lower()) not in all_interpolators else all_interpolators.index(intp)
if idx < 0:
raise ValueError(f"Interpolator '{interpolator}Interpolator' is not supported. Available interpolators: {si.__all__}")
Interpolator: InterpolatorCls = getattr(si, si.__all__[idx] , None) # type: ignore
# apply summation always in domain 'pow' -> check/transform:
fft_data = get_value_representation(fft_data, 'pow', input_domain)
# Ensure we work with magnitudes (handle complex fft_data)
fft_data = np.abs(fft_data)
# Move the FFT axis to the front for easier computation
fft_data = np.moveaxis(fft_data, axis, 0)
# Derive n_fft and frequency vector from fft_data shape along the specified axis
n_fft = (fft_data.shape[0] - 1) * 2
freqs = np.fft.rfftfreq(n_fft, 1/fs)
# Calculate clipping level in linear domain from dB
clip_level_linear = 0.0 if clip_output_db is None else get_value_representation(clip_output_db, 'pow', 'dB')
# Initialize an array to store the magnitude/power in each octave band
output_shape = (len(bands),) + fft_data.shape[1:]
octave_band_levels = np.zeros(output_shape)
# Loop over each octave band
for i, band in enumerate(bands):
w = band.get_freq_resp(fs, freqs)
w = np.power(w, 2) # Convert to power response if needed
w = np.expand_dims(w, axis=tuple(range(1, len(fft_data.shape))))
# transfer function or power spectrum?
if is_transfer_function:
idx_f1 = np.argmin(np.abs(freqs - band.lower_freq))
idx_f2 = np.argmin(np.abs(freqs - band.upper_freq)) + 1
octave_band_levels[i] = np.mean(w[idx_f1:idx_f2] * fft_data[idx_f1:idx_f2], axis=0)
else:
octave_band_levels[i] = np.sum(w * fft_data, axis=0)
"""
# Band edges
f_low, f_center, f_high = band.lower_freq, band.center_freq, band.upper_freq
# Find the FFT bins that fall within this frequency band
op_high = np.less_equal if i == len(bands) - 1 else np.less
idx_band = np.where(np.greater_equal(freqs, f_low) & op_high(freqs, f_high))[0]
if len(idx_band) > 0:
idx_center = np.argmin(np.abs(freqs - f_center))
if idx_center < idx_band[0] or idx_center > idx_band[-1]:
raise ValueError(f"Center frequency {f_center} is not within the band {band}.")
# Calculate the weighting for the band edges
w = np.ones_like(idx_band, dtype=float)
if len(idx_band) >= 3:
w_edge = 10 ** (band.cutoff_attenuation / 10.0)
y = np.array([w_edge, 1.0, w_edge])
x = np.array([f_low, f_center, f_high])
xf = np.log(freqs[idx_band])
x = np.log(x)
w = Interpolator(x, y, **interpolator_kwargs)(xf)
octave_band_levels[i] = aggregate_freq(w[:, None] * fft_data[idx_band], axis=0)
#if is_transfer_function:
# # Average the magnitudes for transfer functions
# octave_band_levels[i] = np.mean(w * fft_data[idx_band], axis=0)
#else:
# # Sum the magnitudes/power (use squared magnitudes for power)
# fft_pow = np.square(w[:,None] * fft_data[idx_band])
# octave_band_levels[i] = np.sqrt(np.sum(fft_pow, axis=0))
else:
# Linear interpolation when no FFT bin falls within the band
if f_low < freqs[0] or f_high > freqs[-1]:
# If out of bounds, set to zero
octave_band_levels[i] = 0
else:
# Perform linear interpolation for each slice along the remaining dimensions
lower_bin = np.searchsorted(freqs, f_low, side='right') - 1
upper_bin = np.searchsorted(freqs, f_high, side='left')
# Linear interpolation using apply_along_axis
def interpolate(slice_data: NDArray) -> NDArray:
return np.interp(
f_center,
[freqs[lower_bin], freqs[upper_bin]],
[slice_data[lower_bin], slice_data[upper_bin]]
)
octave_band_levels[i] = np.apply_along_axis(interpolate, axis=0, arr=fft_data)
"""
# Apply clipping in the linear domain (convert clip_output_db to linear)
octave_band_levels = np.maximum(octave_band_levels, clip_level_linear)
# Move the axis back to its original position
octave_band_levels = np.moveaxis(octave_band_levels, 0, axis)
octave_band_levels = get_value_representation(octave_band_levels, output_domain, 'pow')
return bands.center_freqs, octave_band_levels
if __name__ == "__main__":
pass
\ No newline at end of file
"""
Created on: 11 Jun 2025 - 14:16
Author: jan.reimes@head-acoustics.com
"""
from numpy.typing import NDArray
from .bands import fft_to_octave_bands
from .types import AggregateFn, ISpectrumVsTime, ISpectrumAvg, FrequencyBands, Ax
from .abc_spec_vs_time import SpectrumVsTimeBase
from .abc_spec_avg import SpectrumAvgBase
from .spectral_args import SpectralArguments
class BandSpectrumVsTime(SpectrumVsTimeBase, ISpectrumVsTime):
_fft_spec_vs_time: SpectrumVsTimeBase
_bands: FrequencyBands
def __init__(self, fft_spectrum_vs_time: SpectrumVsTimeBase, bands: FrequencyBands):
if not isinstance(bands, FrequencyBands):
raise TypeError("bands must be an instance of FrequencyBands.")
self._bands = bands
self._fft_spec_vs_time = fft_spectrum_vs_time
def _lazy_initialize(self) -> bool:
"""
Lazy initialization of frequency, time, and values attributes - including transformation into frequency bands.
This method should be called before accessing freq, time, or values properties.
"""
if (uninitialized := (self._freq is None or self._time is None or self._values is None)):
# initialize frequency and time from the FFT vs time spectrum
self._fft_spec_vs_time._lazy_initialize()
# link time
self._time = self._fft_spec_vs_time.time
# aggregate the FFT spectrum into frequency bands
v = self._fft_spec_vs_time._values
self._freq, self._values = fft_to_octave_bands(v, self.fs, bands=self._bands, axis=Ax.FREQ, is_transfer_function=False,
input_domain='pow', output_domain='pow'
)
return uninitialized
@property
def x(self) -> NDArray:
return self._fft_spec_vs_time.x
@property
def y(self) -> NDArray:
return self._fft_spec_vs_time.y
@property
def fs(self) -> int:
return self._fft_spec_vs_time.fs
@property
def is_2d(self) -> int:
return self._fft_spec_vs_time.is_2d
@property
def spectrum_args(self) -> SpectralArguments:
return self._fft_spec_vs_time.spectrum_args
def get_aggregate(self, aggregate: AggregateFn = "mean") -> ISpectrumAvg:
"""
Aggregate the spectrum over time using the specified aggregate method.
Args:
aggregate (Literal["mean", "median"]): Method to use for aggregation.
"""
return BandSpectrumAvg(self, aggregate=aggregate)
class BandSpectrumAvg(SpectrumAvgBase, ISpectrumAvg):
_ref_spec_vs_time: BandSpectrumVsTime
_aggregate: AggregateFn = 'mean'
@property
def x(self) -> NDArray:
return self._ref_spec_vs_time.x
@property
def y(self) -> NDArray:
return self._ref_spec_vs_time.y
@property
def fs(self) -> int:
return self._ref_spec_vs_time.fs
@property
def is_2d(self) -> int:
return self._ref_spec_vs_time.is_2d
@property
def spectrum_args(self) -> SpectralArguments:
return self._ref_spec_vs_time.spectrum_args
if __name__ == "__main__":
pass
\ No newline at end of file
"""
Created on: 04 Jun 2025 - 08:49
Author: jan.reimes@head-acoustics.com
This module implements FFT-based spectrum analysis for time-frequency analysis.
Provides classes for computing spectrograms and averaged spectra from audio signals
using windowed FFT with configurable parameters and aggregation methods.
"""
import numpy as np
from dataclasses import asdict
from typing import Optional, Self
from numpy.typing import NDArray
from ensure import Check
from scipy.signal._spectral_py import _spectral_helper
from .bands import get_bands_iec_61260_1_2014
from .types import AggregateFn, ISpectrumVsTime, ISpectrumAvg
from .abc_spec_vs_time import SpectrumVsTimeBase
from .abc_spec_avg import SpectrumAvgBase
from .spectral_args import SpectralArguments
class FftSpectrumAvg(SpectrumAvgBase, ISpectrumAvg):
@classmethod
def from_signals(cls, x: NDArray, fs: int, y: Optional[NDArray] = None, aggregate: AggregateFn = 'mean', axis: int = 0, **kwargs) -> Self:
spec_args, kwargs = SpectralArguments.from_kwargs(fs=fs, **kwargs)
return cls(FftSpectrumVsTime.from_signals(x, y=y, axis=axis, **asdict(spec_args)), aggregate, **kwargs)
def to_bands_vs_time(self, fraction: int) -> "BandSpectrumAvg":
"""
Convert the FFT spectrum average to bands vs time.
Args:
fraction (int): Fraction of the frequency range to use for banding.
Returns:
BandSpectrumAvg: Bands vs time object.
"""
if isinstance(self._ref_spec_vs_time, FftSpectrumVsTime):
return self._ref_spec_vs_time.to_bands_vs_time(fraction).get_aggregate(self._aggregate)
else:
raise TypeError(f"Cannot convert {self._ref_spec_vs_time.__class__.__name__} to average band spectrum.")
class FftSpectrumVsTime(SpectrumVsTimeBase, ISpectrumVsTime):
@classmethod
def from_signals(cls, x: NDArray, fs: int, y: Optional[NDArray] = None, axis: int = 0, **kwargs) -> Self:
"""
Initialize the FFT vs Time object.
Args:
x (NDArray): Input audio signal (1D or 2D).
fs (int): Sampling frequency.
window (str): Window type for FFT.
nfft (int): Number of FFT points.
fmin (float): Minimum frequency for analysis.
fmax (float): Maximum frequency for analysis.
overlap (float): Overlap percentage for FFT windows.
"""
kwargs['fs'] = fs
spec_args, kwargs = SpectralArguments.from_kwargs(**kwargs)
return cls(x, spec_args, y, axis=axis, **kwargs)
def __init__(self, x: NDArray, spec_args: SpectralArguments, y: Optional[NDArray] = None, axis: int = 0, **kwargs):
self._spectrum_args = spec_args
self._active_frames = kwargs.get('active_frames', None)
always_2d = kwargs.get('always_2d', False)
# turn input signals always into (time, channels) format
x = np.moveaxis(x, axis, 0)
if (d := (self.spectrum_args.nperseg-x.shape[0])) > 0: # x.shape[axis] < nperseg:
# zero pad for small signals
x = np.pad(x, (0, d))
if y is not None:
y = np.moveaxis(y, axis, 0)
# zero pad x/y to match length
max_len: int = np.maximum(x.shape[0], y.shape[0])
if y.shape[0] < max_len:
y = np.pad(y, (0, max_len - y.shape[0]))
if x.shape[0] < max_len:
x = np.pad(x, (0, max_len - x.shape[0]))
else:
y = x
# check if x is 1D or 2D
Check(x.ndim in (1, 2)).is_true(f"x must be 1D or 2D, got {x.ndim}D.")
# check if x and y have the same shape
Check(x.shape == y.shape).is_true(f"x and y must have the same shape, got {x.shape} and {y.shape}.")
# ensure x is always 2D, if always_2d is True
if always_2d and x.ndim == 1:
x = x.reshape((-1, 1))
y = y.reshape((-1, 1))
self._y = y
self._x = x
def _lazy_initialize(self) -> bool:
"""
Lazy initialization of frequency, time, and values attributes.
This method should be called before accessing freq, time, or values properties.
Returns True if initialization was (successfully) run, False if it was already initialized.
"""
if (uninitialized := (self._freq is None or self._time is None or self._values is None)):
spec_args = dict(
fs=float(self.fs),
window=self.spectrum_args.window,
nperseg=self.spectrum_args.nperseg,
noverlap=self.spectrum_args.noverlap,
nfft=self.spectrum_args.nfft,
detrend='constant',
return_onesided=True,
scaling=self.spectrum_args.scaling,
axis=0, # time axis of signals is now always on axis=0
mode=self.spectrum_args.mode,
boundary=None,
padded=self.spectrum_args.padded
)
x, y = (self.x, self.y) if self.is_2d else (self.x.reshape((-1, 1)), self.y.reshape((-1, 1)))
data = [] # multi-channel data
for i in range(x.shape[1]):
self._freq, self._time, d = _spectral_helper(x[:,i], y[:, i], **spec_args)
data.append(d)
# stack data along the last axis to create a multi-channel spectrum
self._values = np.stack(data, axis=-1)
if not self.is_2d:
self._values = self._values.squeeze(axis=-1)
if np.isclose(np.abs(self._values), np.real(self._values)).all():
self._values = np.abs(self._values) # use abs values if complex values are not present
if self._active_frames is None:
# if no active frames are given, use all frames
self._active_frames = np.ones(self._time.shape, dtype=np.bool_)
return uninitialized
def get_aggregate(self, aggregate: AggregateFn = "mean") -> ISpectrumAvg:
"""
Aggregate the spectrum over time using the specified aggregate method.
Args:
aggregate (Literal["mean", "median"]): Method to use for aggregation.
"""
return FftSpectrumAvg(self, aggregate=aggregate)
def to_bands_vs_time(self, fraction: int) -> ISpectrumVsTime:
"""
Convert the FFT spectrum to bands vs time.
Args:
fraction (int): Fraction of the frequency range to use for banding.
Returns:
BandSpectrumVsTime: Bands vs time object.
"""
from .impl_bands import BandSpectrumVsTime
bands = get_bands_iec_61260_1_2014(fraction, fmin=self.spectrum_args.fmin, fmax=self.spectrum_args.fmax)
return BandSpectrumVsTime(self, bands)
if __name__ == "__main__":
pass
\ No newline at end of file