Skip to content
Commits on Source (3)
# Nonlinearity Measure - Reference implementation of TS 104063
# Nonlinearity Measure - Reference implementation of ETSI TS 104 063
Code for non-linearity measure for upcoming ETSI TS 104 063. Includes also verification and testing with simple non-linear models.
......@@ -56,12 +56,53 @@ Configuration of the whole project is based on `hydra-core` package, see https:/
# API access
See `calculate_nlm_file(..)` for file-based calculations or `calculate_nlm(..)` for numpy/array-based calculations in `nonlinearity.calculate.py`.
# Debug data
Dependencies for showing and saving extended result data are installed as follows:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --extra debug
~~~
Or install all extra dependencies:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --all-extras
~~~
# Advanced
The ITU-T P.56 active speech level implementation in Python is rather slow. You may compile it for better performance.
Ensure that all development dependencies are installed:
~~~terminal
../ts104063-nonlinearity-measure> uv sync --dev
~~~
Compile with e.g., nuitka:
~~~terminal
../ts104063-nonlinearity-measure> uv run nuitka --module nonlinearity/utils/p56/asl.py --include-module=numpy --output-dir=nonlinearity/utils/p56/build
[ ... ]
../ts104063-nonlinearity-measure> uv run nuitka --module nonlinearity/utils/p56/prefilter.py --include-module=numpy --include-module=scipy.signal --output-dir=nonlinearity/utils/p56/build --remove-output
~~~
# References
(list is for sure incomplete; further input is appreciated!)
## Sweep-based system identification
https://ant-novak.com/posts/research/2015-10-30_JAES_Swept/
https://hal.science/hal-02504303/document
https://www.melaudia.net/zdoc/sweepSine.PDF
## Non-linearities & Modelling
https://www.researchgate.net/publication/286670404_Characterisation_and_modelling_of_non-linear_loudspeakers
https://www.hsu-hh.de/ant/wp-content/uploads/sites/699/2018/04/Eichas_VA_Modeling_of_guitar_amps_with_WH_models.pdf
https://openhsu.ub.hsu-hh.de/server/api/core/bitstreams/0b6f9c6f-c444-45f7-b392-e2ff4188d25b/content
https://hal.science/hal-02504303/document
## Related Distortion Measures
https://www.listeninc.com/wp/media/2023/06/paper_121AES_non_coherent_distortion_Listen_Inc.pdf
https://www.klippel.de/fileadmin/klippel/Files/Know_How/Application_Notes/AN_07_Weighted_Harmonic_Distortion_%28HI-2%29.pdf
\ No newline at end of file
......@@ -7,10 +7,11 @@ Created on Sept 03 2024 10:26
import logging
from pathlib import Path
from typing import List
from typing import Any, Dict, List
import hydra
from omegaconf import DictConfig
import psutil
from omegaconf import DictConfig, OmegaConf
from rootutils import setup_root
from .calculate import NLMResult, calculate_nlm_file
......@@ -21,14 +22,11 @@ from .utils.resolver import register_all_resolvers
register_all_resolvers() # needs to be called before using OmegaConf
@hydra.main(version_base="1.3", config_path="../configs", config_name="config")
def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
logger = logging.getLogger("nonlinearity")
try:
# check/resolve input files
results: List[NLMResult] = []
for measured, ref in get_file_pairs(cfg.file_measured, cfg.file_ref):
def nlm_calc(cfg: Dict[str, Any] | DictConfig, measured: str, ref: str) -> NLMResult:
"""Internal function to calculate NLM for a single file pair."""
cfg["file_measured"] = Path(measured).absolute()
cfg["file_ref"] = Path(ref).absolute()
......@@ -41,10 +39,35 @@ def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
# save single values to output?
save_nlm_results(res, cfg)
# collect results
return res
@hydra.main(version_base="1.3", config_path="../configs", config_name="config")
def main(cfg: DictConfig) -> List[NLMResult] | NLMResult:
try:
# check/resolve input files
file_pairs = get_file_pairs(cfg.file_measured, cfg.file_ref)
if (nbr_fp := len(file_pairs)) == 1:
# single file pair, no parallelization needed:
res = nlm_calc(cfg, *file_pairs[0])
logger.info(f"{Path(file_pairs[i][0]).name}\tNLM = {res.nlm.round(1)} dB")
return res
else:
# run in parallel for multiple file pairs
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib
n_workers = max(1, min(nbr_fp, cfg.get("n_jobs", psutil.cpu_count() // 2)))
results: List[NLMResult] = []
with tqdm_joblib(desc="NLM calculation", total=len(file_pairs)) as pb:
jobs = (delayed(nlm_calc)(OmegaConf.to_object(cfg), measured, ref) for measured, ref in file_pairs) # add copy of config to each pair
for i, res in enumerate(Parallel(n_jobs=n_workers, return_as="generator")(jobs)):
pb.set_description_str(f"{Path(file_pairs[i][0]).name}\tNLM = {res.nlm.round(1)} dB")
results.append(res)
return results[0] if len(results) == 1 else results
return results
except Exception as e:
logger.exception(f"Error during calculation: {e}")
......
......@@ -5,11 +5,15 @@ Created on Jan 10 2025 09:17
@author: Jan.Reimes
"""
import pandas as pd
import logging
import warnings
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
import pandas as pd
warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
logger = logging.getLogger("nonlinearity.debug")
if __name__ == "__main__":
pass
pass
......@@ -4,22 +4,32 @@ Created on Jan 10 2025 09:17
@author: Jan.Reimes
"""
import logging
from pathlib import Path
import pandas as pd
import numpy as np
import pandas as pd
from omegaconf import DictConfig
import locket
from .. import NLMResult, NLMDict
from .. import NLMResult
from .utils import get_sweep_overloads
IDX_COLS = ['file_measured', 'file_ref', 'channel']
DATA_COLS = ['NLM', 'p56.asl', 'p56.asl', 'delay', 'max_xcorr']
IDX_COLS = ["file_measured", "file_ref", "channel"]
DATA_COLS = ["NLM", "p56.asl", "p56.act", "delay", "max_xcorr"]
logger = logging.getLogger("nonlinearity.debug")
def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
if (cfg_debug := cfg.get('debug')) and (cfg_single_values := cfg_debug.get('single_values')):
precision = cfg_single_values.get('precision', 2)
if (cfg_debug := cfg.get("debug")) and (cfg_single_values := cfg_debug.get("single_values")):
try:
import locket
except ImportError:
logger.warning("Some packages are missing that are required for saving spectral data. Please run 'uv sync --extra debug' or 'uv sync --all-extras'.")
return
precision = cfg_single_values.get("precision", 2)
# determine index columns; add multi-run/sweep parameters to index, if available
# (exclude sweep parameters that are already in IDX_COLS
......@@ -28,13 +38,13 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
idx_cols += list(sweep_params.keys())
sweep_key = tuple(sweep_params.values())
single_value_file = Path(cfg_single_values.get('single_values_file', 'nlm_single_values.xlsx'))
single_value_file = Path(cfg_single_values.get("single_values_file", "nlm_single_values.xlsx"))
if not single_value_file.is_absolute():
output_dir = Path(cfg['paths']['output_dir']).absolute()
output_dir = Path(cfg["paths"]["output_dir"]).absolute()
single_value_file = Path(output_dir / single_value_file).resolve()
# protect against concurrent write with semaphore/lock
with locket.lock_file(single_value_file.with_suffix('.lock')):
with locket.lock_file(single_value_file.with_suffix(".lock")):
# data storage - load existing (in case of e.g. multiple runs or special output file)
if single_value_file.is_file():
df = pd.read_excel(single_value_file, index_col=np.arange(len(idx_cols)).tolist()).infer_objects()
......@@ -42,21 +52,23 @@ def save_nlm_results(results: NLMResult, cfg: DictConfig) -> None:
df = pd.DataFrame(columns=idx_cols + DATA_COLS).set_index(idx_cols)
for ch_idx in range(results.nlm.shape[0]):
key = (cfg['file_measured'].name, cfg['file_ref'].name, ch_idx+1)
key = (cfg["file_measured"].name, cfg["file_ref"].name, ch_idx + 1)
key = key + sweep_key
df.loc[key, 'NLM'] = np.round(results.nlm[ch_idx], precision)
df.loc[key, "NLM"] = np.round(results.nlm[ch_idx], precision)
for name, values in results.levels.asdict().items():
df.loc[key, f'level.{name}'] = np.round(values[ch_idx], precision)
df.loc[key, f"level.{name}"] = np.round(values[ch_idx], precision)
# other values
df.loc[key, 'delay'] = np.round(results.delay[ch_idx], precision)
df.loc[key, 'max_xcorr'] = np.round(results.max_xcorr[ch_idx], precision)
df.loc[key, 'p56.act'] = np.round(results.p56.act, precision)
df.loc[key, 'p56.asl'] = np.round(results.p56.asl, precision)
df.loc[key, "delay"] = np.round(results.delay[ch_idx], precision)
df.loc[key, "max_xcorr"] = np.round(results.max_xcorr[ch_idx], precision)
df.loc[key, "p56.act"] = np.round(results.p56.act, precision)
df.loc[key, "p56.asl"] = np.round(results.p56.asl, precision)
df.sort_index(inplace=True)
df.to_excel(single_value_file, merge_cells=cfg_single_values.get('merge_cells', False))
df.to_excel(
single_value_file,
merge_cells=cfg_single_values.get("merge_cells", False),
)
if __name__ == "__main__":
......
......@@ -4,51 +4,74 @@ Created on Jan 10 2025 09:36
@author: Jan.Reimes
"""
import logging
from pathlib import Path
import numpy as np
from omegaconf import DictConfig
import locket
from .. import NLMResult, NLMDict, MIN_LEVEL_RMS
from .utils import get_sweep_overloads, convert_hdf5_compatible
from .. import MIN_LEVEL_RMS, NLMResult
from .utils import convert_dict_to_hdf5_compatible, get_sweep_overloads, is_hydra_environment
IDX_COLS = ['file_measured', 'file_ref']
IDX_COLS = ["file_measured", "file_ref"]
logger = logging.getLogger("nonlinearity.debug")
def save_spectral_data(results: NLMResult, cfg: DictConfig) -> None:
if (cfg_debug := cfg.get('debug')) and (cfg_spectrum := cfg_debug.get('spectral_data')):
if (cfg_debug := cfg.get("debug")) and (cfg_spectrum := cfg_debug.get("spectral_data")):
try:
import h5py
import locket
except ImportError:
logger.warning("Some packages are missing that are required for saving spectral data. Please run 'uv sync --extra debug' or 'uv sync --all-extras'.")
return
# get metadata for sweep, including file names
sweep_params = {k: cfg[k] for k in IDX_COLS}
sweep_params.update(get_sweep_overloads(cfg, include_params=sweep_params, hdf5_compatible=True))
if is_hydra_environment():
sweep_params.update(get_sweep_overloads(cfg, include_params=sweep_params))
sweep_params = convert_dict_to_hdf5_compatible(sweep_params)
spectral_data_output_file = Path(cfg_spectrum.get('spectral_data_file', 'spectral_data.h5'))
spectral_data_output_file = Path(cfg_spectrum.get("spectral_data_file", "spectral_data.h5"))
if not spectral_data_output_file.is_absolute():
output_dir = Path(cfg['paths']['output_dir']).absolute()
output_dir = Path(cfg["paths"]["output_dir"]).absolute()
spectral_data_output_file = Path(output_dir / spectral_data_output_file).resolve()
# protect against concurrent write with semaphore/lock
with locket.lock_file(spectral_data_output_file.with_suffix('.lock')):
with locket.lock_file(spectral_data_output_file.with_suffix(".lock")):
# data storage - load existing (in case of e.g. multiple runs or special output file)
with h5py.File(spectral_data_output_file, 'a') as f:
with h5py.File(spectral_data_output_file, "a") as f:
# base group
g = f.require_group(cfg['file_measured'].name).require_group(cfg['file_ref'].name)
g = f.require_group(cfg["file_measured"].name).require_group(cfg["file_ref"].name)
# store possible sweep runs with index
next_key = len(g.keys())
g = g.require_group(f'{next_key}')
g = g.require_group(f"{next_key}")
# store sweep parameters as metadata
g.attrs.update(sweep_params)
# store frequency vector
g.require_dataset('freq', data=results.freq, dtype=results.freq.dtype, shape=results.freq.shape, chunks=True)
g.require_dataset(
"freq",
data=results.freq,
dtype=results.freq.dtype,
shape=results.freq.shape,
chunks=True,
)
# iterate over spectrum types
for name, spectrum in results.spectra.asdict().items():
if cfg_spectrum.get('in_db', False):
if cfg_spectrum.get("in_db", False):
spectrum = 20 * np.log10(np.maximum(spectrum, MIN_LEVEL_RMS))
g.require_dataset(name, data=spectrum, dtype=spectrum.dtype, shape=spectrum.shape, chunks=True)
g.require_dataset(
name,
data=spectrum,
dtype=spectrum.dtype,
shape=spectrum.shape,
chunks=True,
)
if __name__ == "__main__":
......
......@@ -4,32 +4,33 @@ Created on Jan 10 2025 16:13
@author: Jan.Reimes
"""
from types import NoneType
from typing import Any, Dict, Iterable, List, Optional
import numpy as np
from pathlib import Path
from typing import Dict, Any, Optional, List, Iterable
from hydra.core.hydra_config import HydraConf, HydraConfig
from omegaconf import DictConfig, OmegaConf
from hydra.core.hydra_config import HydraConfig, HydraConf
_CONVERT_DTYPES = {
np.dtype('O'): str,
np.dtype('bool'): np.uint8,
NoneType: str
}
from . import logger
_CONVERT_DTYPES = {np.dtype("O"): str, np.dtype("bool"): np.uint8, NoneType: str}
def is_hydra_environment() -> bool:
"""Check if the current environment is a Hydra environment."""
return HydraConfig.instance().cfg is not None # and HydraConfig.instance().job is not None
def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = None,
include_params: Optional[Dict[str, Any]] = None,
hdf5_compatible: bool = False) -> Dict[str, Any]:
def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = None, include_params: Optional[Dict[str, Any]] = None, hdf5_compatible: bool = False) -> Dict[str, Any]:
exclude_params = exclude_params or []
overloads = (include_params or {}).copy()
if is_hydra_environment():
hydra_cfg: HydraConf = HydraConfig.get()
if (sweep_config := hydra_cfg.get('sweeper')) is not None:
if (sweep_config := hydra_cfg.get("sweeper")) is not None:
# read sweep parameters from these keys:
for attr in ['grid_params', 'list_params', 'params']:
for attr in ["grid_params", "list_params", "params"]:
if sweep_params := sweep_config.get(attr):
# get key value from hydra-sweep-config and value from actual config
for p in sweep_params.keys():
......@@ -37,13 +38,21 @@ def get_sweep_overloads(cfg: DictConfig, exclude_params: Optional[List[str]] = N
if p not in exclude_params:
overloads[p] = OmegaConf.select(cfg, p)
else:
logger.warning("No HydraConfig found, sweep parameters not available.")
if hdf5_compatible:
# convert (only) values to hdf5 compatible types
overloads = dict(zip(overloads.keys(), convert_hdf5_compatible(overloads.values())))
overloads = convert_dict_to_hdf5_compatible(overloads)
return overloads
def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
def convert_dict_to_hdf5_compatible(d: Dict[Any, Any]) -> List[Any]:
# convert (only) values to hdf5 compatible types
return dict(zip(d.keys(), convert_values_to_hdf5_compatible(d.values())))
def convert_values_to_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
converted = []
for v in values:
if fn := _CONVERT_DTYPES.get(type(v)):
......@@ -56,5 +65,6 @@ def convert_hdf5_compatible(values: Iterable[Any]) -> List[Any]:
return converted
if __name__ == "__main__":
pass
......@@ -19,16 +19,19 @@ dependencies = [
"hydra-list-sweeper>=1.0.1",
"hydra-callbacks>=0.6.1",
"hydra-joblib-launcher>=1.2.0",
"locket>=1.0.0",
"python-calamine>=0.3.1",
"xlsxwriter>=3.2.2",
"dependency-groups>=1.3.0",
"numba>=0.61.2",
"acoustics>=0.2.6",
"tqdm>=4.67.1",
"tqdm-joblib>=0.0.4",
"psutil>=7.0.0",
]
[project.optional-dependencies]
debug = [
"locket>=1.0.0",
"h5py>=3.13.0",
"matplotlib>=3.10.1",
"seaborn>=0.13.2",
......@@ -58,3 +61,10 @@ addopts = [
[tool.uv]
link-mode = "symlink"
[tool.pyright]
typeCheckingMode = "off"
ignore = ["*"]
[tool.ruff]
line-length = 188