Commit 1031d29b authored by Jean Rebiffe's avatar Jean Rebiffe
Browse files

mypy & readability implement of lncc.py

parent 09f21701
Loading
Loading
Loading
Loading
+92 −70
Original line number Diff line number Diff line
#!/usr/bin/env python3
"""Lightweight NETCONF Controller (LNCC)

Emulates a real SDN Controlleur by pushing NETCONF operations, ...
Emulates a real SDN Controller by pushing NETCONF operations, ...

License
=======
@@ -17,41 +17,47 @@ the text of which is available at https://opensource.org/licenses/BSD-3-Clause
or see the "license.txt" file for more details.

Author: Jean Rebiffe, Orange Innovation Networks
Software description: Emulates a real SDN Controlleur by pushing NETCONF
Software description: Emulates a real SDN Controller by pushing NETCONF
operations, ...
"""

__version__ = "0.3.0"
__version__ = "0.3.1"
__author__ = "Jean Rebiffe, Orange Innovation Networks, 2022"

import argparse
import collections
import logging
import logging.config
import sys
from collections import ChainMap, UserDict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple, cast
from typing import Any, Dict, List, Optional, Set, Tuple

import cmd2
import ncclient
import pandas
import yaml
from ncclient import manager

import cmd2

LOGGER = logging.getLogger("lncc")

# https://datatracker.ietf.org/doc/html/rfc8342#section-5.1
DATASTORES = {"running", "candidate", "startup", "intended"}


class LnccError(Exception):
    ...


class TemplateError(LnccError):
    pass


# https://datatracker.ietf.org/doc/html/rfc8342#section-5.1
DATASTORES = {"running", "candidate", "startup", "intended"}
    """Error when performing a XML templating.

    May appear on `NetworkElement.build_config()` for 2 reasons:
    - the mapping parameters include inexistant mapping name
    - the XML template contains a variable which doesnt appears in mappings
    Typically during <edit-config> operation
    command: `netconf edit-config` with `--template-config` and
    `--mapping` options.
    """


@dataclass
@@ -69,25 +75,22 @@ class NetworkElement:
    name: str
    params: Dict[str, str] = field(compare=False)
    conn: Optional[manager.Manager] = field(default=None, compare=False)
    replies: Dict[str,
                           ncclient.operations.retrieve.GetReply] = field(
    replies: Dict[str, ncclient.operations.retrieve.GetReply] = field(
        default_factory=dict, compare=False)
    tables: Dict[str, pandas.DataFrame] = field(default_factory=dict,
                                                compare=False)
    configs: Dict[str, str] = field(default_factory=dict,
                                              compare=False)
    configs: Dict[str, str] = field(default_factory=dict, compare=False)
    mappings: Dict[str, Dict[str, str]] = field(default_factory=dict,
                                                compare=False)

    def to_dict(self) -> dict:
        dic: dict = {"params": self.params}
    def to_dict(self) -> Dict[str, Any]:
        dic: Dict[str, Any] = {"params": self.params}
        if self.mappings:
            dic["mappings"] = self.mappings
        return dic

    def connect(self) -> manager.Manager:
        """Initiate NETCONF connection"""
        # TODO(Jean): fix vendor-specific hook - needed on my lab
        LOGGER.debug("Start: NETCONF connect() to %s", self.name)
        try:
            self.conn = manager.connect_ssh(**self.params,
@@ -106,7 +109,10 @@ class NetworkElement:

        return self.conn

    def netconf_operation(self, operation: str, reply: str = None, **kwargs):
    def netconf_operation(self,
                          operation: str,
                          reply: Optional[str] = None,
                          **kwargs) -> None:
        if not reply:
            reply = operation

@@ -139,51 +145,66 @@ class NetworkElement:

        LOGGER.debug("Success: NETCONF <%s> on %s", operation, self.name)

    def build_config(self, config_template: str, mappings: Set[str],
                     group_mappings: dict) -> None:
        avails = self.mappings.copy()
        avails.update(self.mappings)

    def build_config(
        self,
        config_template: str,
        mappings: Set[str],
        group_mappings: Dict[str, Dict[str, str]],
    ) -> None:
        assert isinstance(config_template, str)

        unknwon = set(filter(lambda mapp: mapp not in avails, mappings))
        if unknwon:
        # avails = self.mappings.copy()
        # avails.update(self.mappings)
        # unknwon = set(filter(lambda mapp: mapp not in avails, mappings))
        # if unknwon:
        #    raise KeyError(f"Unknwon mapping {unknwon} for NE {self.name} "
        #                   f"Available mappings: {set(avails)}")
        # usables: dict = dict(filter(lambda a: a[0] in mappings,
        #                            avails.items()))
        # mapping = dict(collections.ChainMap(*usables.values()))

        # TODO(jean): To use all mapping by default
        maps = ChainMap(self.mappings, group_mappings)

        if not mappings.issubset(maps):
            # TODO(Jean): build own Exception?
            raise KeyError(f"Unknwon mapping {unknwon} for NE {self.name} "
                           f"Available mappings: {set(avails)}")
            raise TemplateError(f"Unknwon mapping {mappings.difference(maps)} "
                                f"for NE {self.name}. "
                                f"Available mappings: {set(maps)}")

        usables: dict = dict(filter(lambda a: a[0] in mappings,
                                    avails.items()))
        mapping = dict(collections.ChainMap(*usables.values()))
        usables = [maps[key] for key in mappings]
        fmtmap = ChainMap(*usables)

        try:
            self.configs["default"] = config_template.format(**mapping)
            self.configs["default"] = config_template.format_map(fmtmap)
        except KeyError as err:
            raise TemplateError(
                f"Cannot build template, {err} missing") from err
                f"Cannot build template, {err} missing for NE {self.name}"
            ) from err

    def table_from_xml(self,
    def table_from_xml(
        self,
        table: str = "get",
                       reply: str = None,
                       xsl_transform: str = None) -> None:
        reply: Optional[str] = None,
        xsl_transform: Optional[str] = None,
    ) -> None:
        """Build a panda DataFrame, based on previous XML ouput"""
        if reply is None:
            reply = table

        # TODO(Jean): Fix mypy compains with pandas
        new_table: pandas.DataFrame = pandas.read_xml(
            self.replies[reply].data_xml, stylesheet=xsl_transform)
        self.tables[table] = cast(pandas.DataFrame, new_table)
        self.tables[table] = pandas.read_xml(self.replies[reply].data_xml,
                                             stylesheet=xsl_transform)


class NetworkElementGroup(collections.UserDict):
class NetworkElementGroup(UserDict):
    """Represent a Group of Network Element"""

    # TODO(Jean): Correct static dict in class declaration
    tables: Dict[str, pandas.DataFrame] = {}
    mappings: Dict[str, Dict[str, str]] = {}

    @staticmethod
    def from_dict(dic: dict) -> "NetworkElementGroup":
    def from_dict(dic: Dict[str, Any]) -> "NetworkElementGroup":
        """Build dict, that can be dumped in a file (YAML/JSON/...)"""
        assert "nes" in dic

@@ -203,9 +224,9 @@ class NetworkElementGroup(collections.UserDict):

        return group

    def to_dict(self) -> dict:
    def to_dict(self) -> Dict[str, Dict[str, Any]]:
        """Build dict, that can be loaded from a file (YAML/JSON/...)"""
        dic: dict = {"nes": {}}
        dic: Dict[str, Dict[str, Any]] = {"nes": {}}
        if self.mappings:
            dic["mappings"] = self.mappings
        for name, n_e in self.items():
@@ -239,10 +260,12 @@ class NetworkElementGroup(collections.UserDict):
        for n_e in self.values():
            n_e.build_config(config_template, mappings, self.mappings)

    def table_from_xml(self,
    def table_from_xml(
        self,
        table: str = "get",
                       reply: str = None,
                       xsl_transform: str = None) -> None:
        reply: Optional[str] = None,
        xsl_transform: Optional[str] = None,
    ) -> None:
        for n_e in self.data.values():
            n_e.table_from_xml(table, reply, xsl_transform)

@@ -250,7 +273,6 @@ class NetworkElementGroup(collections.UserDict):
        # TODO(Jean): Refactoring needed?
        tables = []
        for name, state in self.items():
            # print(state)
            tmp_tb = state.tables[table].copy()
            tmp_tb["origin"] = name
            tables.append(tmp_tb)
@@ -261,7 +283,7 @@ class NetworkElementGroup(collections.UserDict):

    def table_to_excel(self,
                       excel_writer: str = "get.xlsx",
                       tables: List[str] = None):
                       tables: Optional[List[str]] = None) -> None:
        if not tables:
            tables = ["get"]

@@ -276,17 +298,18 @@ class LnccCli(cmd2.Cmd):
    The LNNC command-line interface is build around "cmd2" package

    >>> import sys
    >>> sys.exit(LnccCli().cmdloop())
    >>> app = LnccCli()
    >>> sys.exit(app.cmdloop())
    """

    nes: NetworkElementGroup = NetworkElementGroup()
    tables: Dict[str, pandas.DataFrame] = dict()
    tables: Dict[str, pandas.DataFrame] = {}
    logging_config = None

    intro = ("Welcome to the Lightweight NETCONF Controller lncc.py\n"
             "Type 'help' for more information")

    def __init__(self, *args, **kwargs):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        logg = cmd2.Settable(
            name="logging_config",
@@ -298,7 +321,7 @@ class LnccCli(cmd2.Cmd):
        self.add_settable(logg)

    @staticmethod
    def _onchange_logging(param_name, old, new):
    def _onchange_logging(param_name, old, new) -> None:
        del param_name, old  # Unused
        # import io
        # assert isinstance(new, io.TextIOWrapper)
@@ -332,7 +355,7 @@ class LnccCli(cmd2.Cmd):

        self.pfeedback(f"NE {n_e.name} added, current: {len(self.nes)} NEs")

    def ne_delete(self, args: argparse.Namespace):
    def ne_delete(self, args: argparse.Namespace) -> None:
        assert hasattr(args, "name")
        unknown = list(filter(lambda name: name not in self.nes, args.name))
        if unknown:
@@ -356,7 +379,7 @@ class LnccCli(cmd2.Cmd):
        self.pfeedback(f"Deleted {len(args.name)} NEs: {args.name}, "
                       f"current: {len(self.nes)} NEs")

    def ne_load(self, args: argparse.Namespace):
    def ne_load(self, args: argparse.Namespace) -> None:
        assert hasattr(args, "file")

        with args.file:
@@ -374,7 +397,7 @@ class LnccCli(cmd2.Cmd):
        self.pfeedback("Loading complete: "
                       f"{len(news)} new NEs: {list(news.keys())}")

    def ne_dump(self, args: argparse.Namespace):
    def ne_dump(self, args: argparse.Namespace) -> None:
        nes_dict = self.nes.to_dict()

        if args.file:
@@ -383,7 +406,7 @@ class LnccCli(cmd2.Cmd):
        else:
            self.poutput(yaml.safe_dump(nes_dict))

    def ne_print(self, args: argparse.Namespace):
    def ne_print(self, args: argparse.Namespace) -> None:
        assert args.name
        tables = list(self.nes[args.name].tables.keys())
        configs = list(self.nes[args.name].configs.keys())
@@ -429,7 +452,7 @@ class LnccCli(cmd2.Cmd):

    @cmd2.with_argparser(ne_parser)
    @cmd2.with_category("Parameter management")
    def do_ne(self, args: argparse.Namespace):
    def do_ne(self, args: argparse.Namespace) -> None:
        if not hasattr(args, "func"):
            self.do_help("netconf")
            return
@@ -438,7 +461,7 @@ class LnccCli(cmd2.Cmd):

    # Mapping command and subcommands

    def mapping_add(self, args: argparse.Namespace):
    def mapping_add(self, args: argparse.Namespace) -> None:
        assert hasattr(args, "name")
        assert hasattr(args, "mapping")
        assert hasattr(args, "ne")
@@ -464,7 +487,7 @@ class LnccCli(cmd2.Cmd):
        else:
            self.nes.mappings[args.name] = maps

    def mapping_print(self, args: argparse.Namespace):
    def mapping_print(self, args: argparse.Namespace) -> None:
        del args  # Unued
        self.poutput(f"Group mapping: {self.nes.mappings}")
        for name, n_e in self.nes.items():
@@ -484,7 +507,7 @@ class LnccCli(cmd2.Cmd):

    @cmd2.with_argparser(mapping_parser)
    @cmd2.with_category("Parameter management")
    def do_mapping(self, args: argparse.Namespace):
    def do_mapping(self, args: argparse.Namespace) -> None:
        if not hasattr(args, "func"):
            self.do_help("mapping")
            return
@@ -522,6 +545,7 @@ class LnccCli(cmd2.Cmd):
    _nc_get_config_parser.add_argument("--filter-file",
                                       metavar="XML-FILE",
                                       type=argparse.FileType("r"))
    _nc_get_config_parser.add_argument("--source", required=True, choices=DATASTORES)
    _nc_get_config_parser.set_defaults(operation="get-config")

    _nc_edit_parser = _netconf_subparsers.add_parser("edit-config")
@@ -612,7 +636,6 @@ class LnccCli(cmd2.Cmd):
        items3 = filter(lambda item: item[1] is not None, items2)
        kwargs = dict(items3)


        if "reply" not in kwargs:
            kwargs["reply"] = kwargs["operation"]

@@ -646,7 +669,6 @@ class LnccCli(cmd2.Cmd):

        kwargs = {"table": args.table}

        # kwargs["reply"] = args.reply if args.reply else args.table
        kwargs["reply"] = args.reply or args.table

        if args.xsl_transform:
@@ -753,7 +775,7 @@ class LnccCli(cmd2.Cmd):
            self.pfeedback(f"Displayed {kwargs['reply']}")


def main():
def main() -> None:
    app = LnccCli(persistent_history_file=".lncc-history.json.xz",
                  startup_script=".lnccrc.txt")
    sys.exit(app.cmdloop())