Skip to content
Commits on Source (3)
# Nonlinearity Measure - Reference implementation of TS 104063 # Nonlinearity Measure - Reference implementation of ETSI TS 104 063
Code for non-linearity measure for upcoming ETSI TS 104 063. Includes also verification and testing with simple non-linear models. Code for non-linearity measure for upcoming ETSI TS 104 063. Includes also verification and testing with simple non-linear models.
...@@ -56,12 +56,53 @@ Configuration of the whole project is based on `hydra-core` package, see https:/ ...@@ -56,12 +56,53 @@ Configuration of the whole project is based on `hydra-core` package, see https:/
# API access # API access
See `calculate_nlm_file(..)` for file-based calculations or `calculate_nlm(..)` for numpy/array-based calculations in `nonlinearity.calculate.py`. See `calculate_nlm_file(..)` for file-based calculations or `calculate_nlm(..)` for numpy/array-based calculations in `nonlinearity.calculate.py`.
# Debug data
Dependencies for showing and saving extended result data are installed as follows:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --extra debug
~~~
Or install all extra dependencies:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --all-extras
~~~
# Advanced
The ITU-T P.56 active speech level implementation in Python is rather slow. You may compile it for better performance.
Ensure that all development dependencies are installed:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --dev
~~~
Compile with e.g., nuitka:
~~~terminal
../ts104063-nonlinearity-measure> uv run nuitka --module nonlinearity/utils/p56/asl.py --include-module=numpy --output-dir=nonlinearity/utils/p56/build
[ ... ]
../ts104063-nonlinearity-measure> uv run nuitka --module nonlinearity/utils/p56/prefilter.py --include-module=numpy --include-module=scipy.signal --output-dir=nonlinearity/utils/p56/build --remove-output
~~~
# References # References
(list is for sure incomplete; further input is appreciated!)
## Sweep-based system identification
https://ant-novak.com/posts/research/2015-10-30_JAES_Swept/
https://hal.science/hal-02504303/document
https://www.melaudia.net/zdoc/sweepSine.PDF
## Non-linearities & Modelling
https://www.researchgate.net/publication/286670404_Characterisation_and_modelling_of_non-linear_loudspeakers https://www.researchgate.net/publication/286670404_Characterisation_and_modelling_of_non-linear_loudspeakers
https://www.hsu-hh.de/ant/wp-content/uploads/sites/699/2018/04/Eichas_VA_Modeling_of_guitar_amps_with_WH_models.pdf https://www.hsu-hh.de/ant/wp-content/uploads/sites/699/2018/04/Eichas_VA_Modeling_of_guitar_amps_with_WH_models.pdf
https://openhsu.ub.hsu-hh.de/server/api/core/bitstreams/0b6f9c6f-c444-45f7-b392-e2ff4188d25b/content https://openhsu.ub.hsu-hh.de/server/api/core/bitstreams/0b6f9c6f-c444-45f7-b392-e2ff4188d25b/content
https://hal.science/hal-02504303/document ## Related Distortion Measures
https://www.listeninc.com/wp/media/2023/06/paper_121AES_non_coherent_distortion_Listen_Inc.pdf
https://www.klippel.de/fileadmin/klippel/Files/Know_How/Application_Notes/AN_07_Weighted_Harmonic_Distortion_%28HI-2%29.pdf https://www.klippel.de/fileadmin/klippel/Files/Know_How/Application_Notes/AN_07_Weighted_Harmonic_Distortion_%28HI-2%29.pdf
\ No newline at end of file
...@@ -7,10 +7,11 @@ Created on Sept 03 2024 10:26 ...@@ -7,10 +7,11 @@ Created on Sept 03 2024 10:26
import logging import logging
from pathlib import Path from pathlib import Path
from typing import List from typing import Any, Dict, List
import hydra import hydra
from omegaconf import DictConfig import psutil
from omegaconf import DictConfig, OmegaConf
from rootutils import setup_root from rootutils import setup_root
from .calculate import NLMResult, calculate_nlm_file from .calculate import NLMResult, calculate_nlm_file
...@@ -21,14 +22,11 @@ from .utils.resolver import register_all_resolvers ...@@ -21,14 +22,11 @@ from .utils.resolver import register_all_resolvers
register_all_resolvers() # needs to be called before using OmegaConf register_all_resolvers() # needs to be called before using OmegaConf
@hydra.main(version_base="1.3", config_path="../configs", config_name="config")
def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
logger = logging.getLogger("nonlinearity") logger = logging.getLogger("nonlinearity")
try:
# check/resolve input files
results: List[NLMResult] = [] def nlm_calc(cfg: Dict[str, Any] | DictConfig, measured: str, ref: str) -> NLMResult:
for measured, ref in get_file_pairs(cfg.file_measured, cfg.file_ref): """Internal function to calculate NLM for a single file pair."""
cfg["file_measured"] = Path(measured).absolute() cfg["file_measured"] = Path(measured).absolute()
cfg["file_ref"] = Path(ref).absolute() cfg["file_ref"] = Path(ref).absolute()
...@@ -41,10 +39,35 @@ def main(cfg: DictConfig) -> List[NLMResult] | NLMResult: ...@@ -41,10 +39,35 @@ def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
# save single values to output? # save single values to output?
save_nlm_results(res, cfg) save_nlm_results(res, cfg)
# collect results return res
@hydra.main(version_base="1.3", config_path="../configs", config_name="config")
def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
try:
# check/resolve input files
file_pairs = get_file_pairs(cfg.file_measured, cfg.file_ref)
if (nbr_fp := len(file_pairs)) == 1:
# single file pair, no parallelization needed:
res = nlm_calc(cfg, *file_pairs[0])
logger.info(f"{Path(file_pairs[i][0]).name}\tNLM = {res.nlm.round(1)} dB")
return res
else:
# run in parallel for multiple file pairs
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib
n_workers = max(1, min(nbr_fp, cfg.get("n_jobs", psutil.cpu_count() // 2)))
results: List[NLMResult] = []
with tqdm_joblib(desc="NLM calculation", total=len(file_pairs)) as pb:
jobs = (delayed(nlm_calc)(OmegaConf.to_object(cfg), measured, ref) for measured, ref in file_pairs) # add copy of config to each pair
for i, res in enumerate(Parallel(n_jobs=n_workers, return_as="generator")(jobs)):
pb.set_description_str(f"{Path(file_pairs[i][0]).name}\tNLM = {res.nlm.round(1)} dB")
results.append(res) results.append(res)
return results[0] if len(results) == 1 else results return results
except Exception as e: except Exception as e:
logger.exception(f"Error during calculation: {e}") logger.exception(f"Error during calculation: {e}")
......
...@@ -5,11 +5,15 @@ Created on Jan 10 2025 09:17 ...@@ -5,11 +5,15 @@ Created on Jan 10 2025 09:17
@author: Jan.Reimes @author: Jan.Reimes
""" """
import pandas as pd import logging
import warnings import warnings
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning) import pandas as pd
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
logger = logging.getLogger("nonlinearity.debug")
if __name__ == "__main__": if __name__ == "__main__":
pass pass
pass
...@@ -4,22 +4,32 @@ Created on Jan 10 2025 09:17 ...@@ -4,22 +4,32 @@ Created on Jan 10 2025 09:17
@author: Jan.Reimes @author: Jan.Reimes
""" """
import logging
from pathlib import Path from pathlib import Path
import pandas as pd
import numpy as np import numpy as np
import pandas as pd
from omegaconf import DictConfig from omegaconf import DictConfig
import locket
from .. import NLMResult, NLMDict from .. import NLMResult
from .utils import get_sweep_overloads from .utils import get_sweep_overloads
IDX_COLS = ['file_measured', 'file_ref', 'channel'] IDX_COLS = ["file_measured", "file_ref", "channel"]
DATA_COLS = ['NLM', 'p56.asl', 'p56.asl', 'delay', 'max_xcorr'] DATA_COLS = ["NLM", "p56.asl", "p56.act", "delay", "max_xcorr"]
logger = logging.getLogger("nonlinearity.debug")
def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None: def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
if (cfg_debug := cfg.get('debug')) and (cfg_single_values := cfg_debug.get('single_values')): if (cfg_debug := cfg.get("debug")) and (cfg_single_values := cfg_debug.get("single_values")):
precision = cfg_single_values.get('precision', 2) try:
import locket
except ImportError:
logger.warning("Some packages are missing that are required for saving spectral data. Please run 'uv sync --extra debug' or 'uv sync --all-extras'.")
return
precision = cfg_single_values.get("precision", 2)
# determine index columns; add multi-run/sweep parameters to index, if available # determine index columns; add multi-run/sweep parameters to index, if available
# (exclude sweep parameters that are already in IDX_COLS # (exclude sweep parameters that are already in IDX_COLS
...@@ -28,13 +38,13 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None: ...@@ -28,13 +38,13 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
idx_cols += list(sweep_params.keys()) idx_cols += list(sweep_params.keys())
sweep_key = tuple(sweep_params.values()) sweep_key = tuple(sweep_params.values())
single_value_file = Path(cfg_single_values.get('single_values_file', 'nlm_single_values.xlsx')) single_value_file = Path(cfg_single_values.get("single_values_file", "nlm_single_values.xlsx"))
if not single_value_file.is_absolute(): if not single_value_file.is_absolute():
output_dir = Path(cfg['paths']['output_dir']).absolute() output_dir = Path(cfg["paths"]["output_dir"]).absolute()
single_value_file = Path(output_dir / single_value_file).resolve() single_value_file = Path(output_dir / single_value_file).resolve()
# protect against concurrent write with semaphore/lock # protect against concurrent write with semaphore/lock
with locket.lock_file(single_value_file.with_suffix('.lock')): with locket.lock_file(single_value_file.with_suffix(".lock")):
# data storage - load existing (in case of e.g. multiple runs or special output file) # data storage - load existing (in case of e.g. multiple runs or special output file)
if single_value_file.is_file(): if single_value_file.is_file():
df = pd.read_excel(single_value_file, index_col=np.arange(len(idx_cols)).tolist()).infer_objects() df = pd.read_excel(single_value_file, index_col=np.arange(len(idx_cols)).tolist()).infer_objects()
...@@ -42,21 +52,23 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None: ...@@ -42,21 +52,23 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
df = pd.DataFrame(columns=idx_cols + DATA_COLS).set_index(idx_cols) df = pd.DataFrame(columns=idx_cols + DATA_COLS).set_index(idx_cols)
for ch_idx in range(results.nlm.shape[0]): for ch_idx in range(results.nlm.shape[0]):
key = (cfg['file_measured'].name, cfg['file_ref'].name, ch_idx+1) key = (cfg["file_measured"].name, cfg["file_ref"].name, ch_idx + 1)
key = key + sweep_key key = key + sweep_key
df.loc[key, "NLM"] = np.round(results.nlm[ch_idx], precision)
df.loc[key, 'NLM'] = np.round(results.nlm[ch_idx], precision)
for name, values in results.levels.asdict().items(): for name, values in results.levels.asdict().items():
df.loc[key, f'level.{name}'] = np.round(values[ch_idx], precision) df.loc[key, f"level.{name}"] = np.round(values[ch_idx], precision)
# other values # other values
df.loc[key, 'delay'] = np.round(results.delay[ch_idx], precision) df.loc[key, "delay"] = np.round(results.delay[ch_idx], precision)
df.loc[key, 'max_xcorr'] = np.round(results.max_xcorr[ch_idx], precision) df.loc[key, "max_xcorr"] = np.round(results.max_xcorr[ch_idx], precision)
df.loc[key, 'p56.act'] = np.round(results.p56.act, precision) df.loc[key, "p56.act"] = np.round(results.p56.act, precision)
df.loc[key, 'p56.asl'] = np.round(results.p56.asl, precision) df.loc[key, "p56.asl"] = np.round(results.p56.asl, precision)
df.sort_index(inplace=True) df.sort_index(inplace=True)
df.to_excel(single_value_file, merge_cells=cfg_single_values.get('merge_cells', False)) df.to_excel(
single_value_file,
merge_cells=cfg_single_values.get("merge_cells", False),
)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -4,51 +4,74 @@ Created on Jan 10 2025 09:36 ...@@ -4,51 +4,74 @@ Created on Jan 10 2025 09:36
@author: Jan.Reimes @author: Jan.Reimes
""" """
import logging
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
from omegaconf import DictConfig from omegaconf import DictConfig
import locket
from .. import NLMResult, NLMDict, MIN_LEVEL_RMS from .. import MIN_LEVEL_RMS, NLMResult
from .utils import get_sweep_overloads, convert_hdf5_compatible from .utils import convert_dict_to_hdf5_compatible, get_sweep_overloads, is_hydra_environment
IDX_COLS = ['file_measured', 'file_ref'] IDX_COLS = ["file_measured", "file_ref"]
logger = logging.getLogger("nonlinearity.debug")
def save_spectral_data(results: NLMResult, cfg: DictConfig) -> None: def save_spectral_data(results: NLMResult, cfg: DictConfig) -> None:
if (cfg_debug := cfg.get('debug')) and (cfg_spectrum := cfg_debug.get('spectral_data')): if (cfg_debug := cfg.get("debug")) and (cfg_spectrum := cfg_debug.get("spectral_data")):
try:
import h5py import h5py
import locket
except ImportError:
logger.warning("Some packages are missing that are required for saving spectral data. Please run 'uv sync --extra debug' or 'uv sync --all-extras'.")
return
# get metadata for sweep, including file names # get metadata for sweep, including file names
sweep_params = {k: cfg[k] for k in IDX_COLS} sweep_params = {k: cfg[k] for k in IDX_COLS}
sweep_params.update(get_sweep_overloads(cfg, include_params=sweep_params, hdf5_compatible=True)) if is_hydra_environment():
sweep_params.update(get_sweep_overloads(cfg, include_params=sweep_params))
sweep_params = convert_dict_to_hdf5_compatible(sweep_params)
spectral_data_output_file = Path(cfg_spectrum.get('spectral_data_file', 'spectral_data.h5')) spectral_data_output_file = Path(cfg_spectrum.get("spectral_data_file", "spectral_data.h5"))
if not spectral_data_output_file.is_absolute(): if not spectral_data_output_file.is_absolute():
output_dir = Path(cfg['paths']['output_dir']).absolute() output_dir = Path(cfg["paths"]["output_dir"]).absolute()
spectral_data_output_file = Path(output_dir / spectral_data_output_file).resolve() spectral_data_output_file = Path(output_dir / spectral_data_output_file).resolve()
# protect against concurrent write with semaphore/lock # protect against concurrent write with semaphore/lock
with locket.lock_file(spectral_data_output_file.with_suffix('.lock')): with locket.lock_file(spectral_data_output_file.with_suffix(".lock")):
# data storage - load existing (in case of e.g. multiple runs or special output file) # data storage - load existing (in case of e.g. multiple runs or special output file)
with h5py.File(spectral_data_output_file, 'a') as f: with h5py.File(spectral_data_output_file, "a") as f:
# base group # base group
g = f.require_group(cfg['file_measured'].name).require_group(cfg['file_ref'].name) g = f.require_group(cfg["file_measured"].name).require_group(cfg["file_ref"].name)
# store possible sweep runs with index # store possible sweep runs with index
next_key = len(g.keys()) next_key = len(g.keys())
g = g.require_group(f'{next_key}') g = g.require_group(f"{next_key}")
# store sweep parameters as metadata # store sweep parameters as metadata
g.attrs.update(sweep_params) g.attrs.update(sweep_params)
# store frequency vector # store frequency vector
g.require_dataset('freq', data=results.freq, dtype=results.freq.dtype, shape=results.freq.shape, chunks=True) g.require_dataset(
"freq",
data=results.freq,
dtype=results.freq.dtype,
shape=results.freq.shape,
chunks=True,
)
# iterate over spectrum types # iterate over spectrum types
for name, spectrum in results.spectra.asdict().items(): for name, spectrum in results.spectra.asdict().items():
if cfg_spectrum.get('in_db', False): if cfg_spectrum.get("in_db", False):
spectrum = 20 * np.log10(np.maximum(spectrum, MIN_LEVEL_RMS)) spectrum = 20 * np.log10(np.maximum(spectrum, MIN_LEVEL_RMS))
g.require_dataset(name, data=spectrum, dtype=spectrum.dtype, shape=spectrum.shape, chunks=True) g.require_dataset(
name,
data=spectrum,
dtype=spectrum.dtype,
shape=spectrum.shape,
chunks=True,
)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -4,32 +4,33 @@ Created on Jan 10 2025 16:13 ...@@ -4,32 +4,33 @@ Created on Jan 10 2025 16:13
@author: Jan.Reimes @author: Jan.Reimes
""" """
from types import NoneType from types import NoneType
from typing import Any, Dict, Iterable, List, Optional
import numpy as np import numpy as np
from pathlib import Path from hydra.core.hydra_config import HydraConf, HydraConfig
from typing import Dict, Any, Optional, List, Iterable
from omegaconf import DictConfig, OmegaConf from omegaconf import DictConfig, OmegaConf
from hydra.core.hydra_config import HydraConfig, HydraConf
_CONVERT_DTYPES = { from . import logger
np.dtype('O'): str,
np.dtype('bool'): np.uint8, _CONVERT_DTYPES = {np.dtype("O"): str, np.dtype("bool"): np.uint8, NoneType: str}
NoneType: str
}
def is_hydra_environment() -> bool:
"""Check if the current environment is a Hydra environment."""
return HydraConfig.instance().cfg is not None # and HydraConfig.instance().job is not None
def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = None,
include_params: Optional[Dict[str, Any]] = None,
hdf5_compatible: bool = False) -> Dict[str, Any]:
def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = None, include_params: Optional[Dict[str, Any]] = None, hdf5_compatible: bool = False) -> Dict[str, Any]:
exclude_params = exclude_params or [] exclude_params = exclude_params or []
overloads = (include_params or {}).copy() overloads = (include_params or {}).copy()
if is_hydra_environment():
hydra_cfg: HydraConf = HydraConfig.get() hydra_cfg: HydraConf = HydraConfig.get()
if (sweep_config := hydra_cfg.get('sweeper')) is not None: if (sweep_config := hydra_cfg.get("sweeper")) is not None:
# read sweep parameters from these keys: # read sweep parameters from these keys:
for attr in ['grid_params', 'list_params', 'params']: for attr in ["grid_params", "list_params", "params"]:
if sweep_params := sweep_config.get(attr): if sweep_params := sweep_config.get(attr):
# get key value from hydra-sweep-config and value from actual config # get key value from hydra-sweep-config and value from actual config
for p in sweep_params.keys(): for p in sweep_params.keys():
...@@ -37,13 +38,21 @@ def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = N ...@@ -37,13 +38,21 @@ def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = N
if p not in exclude_params: if p not in exclude_params:
overloads[p] = OmegaConf.select(cfg, p) overloads[p] = OmegaConf.select(cfg, p)
else:
logger.warning("No HydraConfig found, sweep parameters not available.")
if hdf5_compatible: if hdf5_compatible:
# convert (only) values to hdf5 compatible types overloads = convert_dict_to_hdf5_compatible(overloads)
overloads = dict(zip(overloads.keys(), convert_hdf5_compatible(overloads.values())))
return overloads return overloads
def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
def convert_dict_to_hdf5_compatible(d: Dict[Any, Any]) -> List[Any]:
# convert (only) values to hdf5 compatible types
return dict(zip(d.keys(), convert_values_to_hdf5_compatible(d.values())))
def convert_values_to_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
converted = [] converted = []
for v in values: for v in values:
if fn := _CONVERT_DTYPES.get(type(v)): if fn := _CONVERT_DTYPES.get(type(v)):
...@@ -56,5 +65,6 @@ def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]: ...@@ -56,5 +65,6 @@ def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
return converted return converted
if __name__ == "__main__": if __name__ == "__main__":
pass pass
...@@ -19,16 +19,19 @@ dependencies = [ ...@@ -19,16 +19,19 @@ dependencies = [
"hydra-list-sweeper>=1.0.1", "hydra-list-sweeper>=1.0.1",
"hydra-callbacks>=0.6.1", "hydra-callbacks>=0.6.1",
"hydra-joblib-launcher>=1.2.0", "hydra-joblib-launcher>=1.2.0",
"locket>=1.0.0",
"python-calamine>=0.3.1", "python-calamine>=0.3.1",
"xlsxwriter>=3.2.2", "xlsxwriter>=3.2.2",
"dependency-groups>=1.3.0", "dependency-groups>=1.3.0",
"numba>=0.61.2", "numba>=0.61.2",
"acoustics>=0.2.6", "acoustics>=0.2.6",
"tqdm>=4.67.1",
"tqdm-joblib>=0.0.4",
"psutil>=7.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]
debug = [ debug = [
"locket>=1.0.0",
"h5py>=3.13.0", "h5py>=3.13.0",
"matplotlib>=3.10.1", "matplotlib>=3.10.1",
"seaborn>=0.13.2", "seaborn>=0.13.2",
...@@ -58,3 +61,10 @@ addopts = [ ...@@ -58,3 +61,10 @@ addopts = [
[tool.uv] [tool.uv]
link-mode = "symlink" link-mode = "symlink"
[tool.pyright]
typeCheckingMode = "off"
ignore = ["*"]
[tool.ruff]
line-length = 188