Commit fbe01152 authored by Mark Canterbury's avatar Mark Canterbury
Browse files

Ruff!

parent c28dfd8a
Loading
Loading
Loading
Loading
+78 −82
Original line number Diff line number Diff line
@@ -6,25 +6,31 @@ from shutil import which

from pycrate_asn1c.asnproc import *


def reconstrainInteger(filename):
    Path('temp.asn').write_text(Path(filename).read_text().replace("18446744073709551615", "65536"))
    return 'temp.asn'
    Path("temp.asn").write_text(
        Path(filename).read_text().replace("18446744073709551615", "65536")
    )
    return "temp.asn"


filesWithBigInts = [
    '102232-1/LI-PS-PDU.asn',
    '102232-3/IPAccessPDU.asn',
    '102232-4/L2AccessPDU.asn'
    "102232-1/LI-PS-PDU.asn",
    "102232-3/IPAccessPDU.asn",
    "102232-4/L2AccessPDU.asn",
]

asn1c_path = ""
change_path_to_unix = False


def fix_path(path):
    if change_path_to_unix:
        return "./" + path.replace("\\", "/")
    else:
        return path


def syntaxCheckASN(fileList):
    """
    Performs ASN syntax checking on a list of filenames (or pathlib Paths)
@@ -40,53 +46,45 @@ def syntaxCheckASN (fileList):
        try:
            if file.as_posix() in filesWithBigInts:
                newFile = reconstrainInteger(str(file))
                p = run([asn1c_path, '-E', fix_path(newFile)], capture_output=True)
                p = run([asn1c_path, "-E", fix_path(newFile)], capture_output=True)
                Path(newFile).unlink()
            else:
                p = run([asn1c_path, '-E', fix_path(str(file))], capture_output=True)
            if (p.returncode != 0):
                p = run([asn1c_path, "-E", fix_path(str(file))], capture_output=True)
            if p.returncode != 0:
                errorMessage = p.stderr.decode().splitlines()[0]
                if errorMessage.startswith('   Value "18446744073709551615" at line'):
                    results[str(file)] = { 'ok' : True}
                    results[str(file)] = {"ok": True}
                    continue
                results[str(file)] = {
                    'ok'   : False,
                    'code' : p.returncode,
                    'message'  : p.stderr.decode().splitlines()[0]
                    "ok": False,
                    "code": p.returncode,
                    "message": p.stderr.decode().splitlines()[0],
                }
            else:
                results[str(file)] = {
                    'ok'   : True
                }            
                results[str(file)] = {"ok": True}
        except Exception as ex:
            raise ex
            results[str(file)] = {
                'ok'   : False,
                'code' : -1,
                'message'  : f"{ex!r}"
            }
            results[str(file)] = {"ok": False, "code": -1, "message": f"{ex!r}"}
    return results


duplicateObjects = {
    '102232-1/LI-PS-PDU.asn' : [
        'CCPayload',
        'IRIPayload',
        'Location'
    ],
    'testing/mod1.asn' : [
        'ClashField'
    ]
    "102232-1/LI-PS-PDU.asn": ["CCPayload", "IRIPayload", "Location"],
    "testing/mod1.asn": ["ClashField"],
}


def fixDuplicateObjects(filename):
    stringContent = filename.read_text()
    for object in duplicateObjects[filename.as_posix()]:
        stringContent = stringContent.replace(f'{object} ::=', f'Native{object} ::=', 1)
        stringContent = stringContent.replace(f'SEQUENCE OF {object}', f'SEQUENCE OF Native{object}')
        stringContent = stringContent.replace(f"{object} ::=", f"Native{object} ::=", 1)
        stringContent = stringContent.replace(
            f"SEQUENCE OF {object}", f"SEQUENCE OF Native{object}"
        )
        # stringContent = sub(f"]\\w{object}", f"] Native{object}", stringContent)

    Path('temp.asn').write_text(stringContent)
    return 'temp.asn'
    Path("temp.asn").write_text(stringContent)
    return "temp.asn"


def compileAllTargets(compileTargets):
@@ -127,19 +125,14 @@ def compileAllTargets (compileTargets):
                logging.debug(f"  Loading {filename}")
            compile_text(fileTexts, filenames=fileNames)
            results[str(firstTarget)] = {
                'ok' : True,
                "ok": True,
            }
        except Exception as ex:
            results[str(firstTarget)] = {
                'ok'   : False,
                'code' : -1,
                'message'  : f"{ex!r}"
            }
            results[str(firstTarget)] = {"ok": False, "code": -1, "message": f"{ex!r}"}
            continue
    return results



def processResults(results, stageName):
    """
    Counts the number of errors and writes out the output per filename
@@ -149,7 +142,7 @@ def processResults (results, stageName):
    :returns: The number of files which had errors
    """
    print("")
    errorCount = sum([1 for r in results.values() if not r['ok']])
    errorCount = sum([1 for r in results.values() if not r["ok"]])
    logging.info(f"{errorCount} {stageName} errors encountered")

    print(f"{'-':-<60}")
@@ -157,9 +150,9 @@ def processResults (results, stageName):
    print(f"{'-':-<60}")
    for filename, result in results.items():
        print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}")
        if not result['ok']:
            if isinstance(result['message'], list):
                for thing in result['message']:
        if not result["ok"]:
            if isinstance(result["message"], list):
                for thing in result["message"]:
                    print(f"    {thing['message']}")
            else:
                print(f"    {result['message']}")
@@ -171,40 +164,41 @@ def processResults (results, stageName):
    return errorCount


if __name__ == '__main__':
    logging.info ('Searching for ASN1C')
if __name__ == "__main__":
    logging.info("Searching for ASN1C")
    asn1c_path = which("asn1c")
    if asn1c_path is None:
        raise Exception("No asn1c executable found. Please install asn1c")
    logging.info(f"asn1c found at {asn1c_path}")
    if asn1c_path.lower().endswith("bat"):
        logging.info (f"asn1c is a batch file, so assume path separators need to be changed")
        logging.info(
            f"asn1c is a batch file, so assume path separators need to be changed"
        )
        change_path_to_unix = True


    logging.info('Searching for ASN.1 files')
    logging.info("Searching for ASN.1 files")
    fileList = list(Path(".").rglob("*.asn1")) + list(Path(".").rglob("*.asn"))
    logging.info(f'{len(fileList)} ASN.1 files found')
    logging.info(f"{len(fileList)} ASN.1 files found")
    for file in fileList:
        logging.debug(f'  {file}')
        logging.debug(f"  {file}")

    ignoreList = Path('testing/asn/asn_ignore.txt').read_text().splitlines()
    ignoreList = Path("testing/asn/asn_ignore.txt").read_text().splitlines()
    ignoredFiles = []
    for ignore in ignoreList:
        logging.debug(f'Ignoring pattern {ignore}')
        logging.debug(f"Ignoring pattern {ignore}")
        for file in fileList:
            if ignore in str(file):
                ignoredFiles.append(file)
                logging.debug(f" Ignoring {str(file)} as contains {ignore}")
    ignoredFiles = list(set(ignoredFiles))
    logging.info(f'{len(ignoredFiles)} files ignored')
    logging.info(f"{len(ignoredFiles)} files ignored")
    for file in ignoredFiles:
        logging.debug(f'  {file}')
        logging.debug(f"  {file}")

    fileList = [file for file in fileList if file not in ignoredFiles]
    logging.info(f'{len(fileList)} files to process')
    logging.info(f"{len(fileList)} files to process")
    for file in fileList:
        logging.debug(f'  {file}')
        logging.debug(f"  {file}")

    if len(fileList) == 0:
        logging.warning("No files specified")
@@ -216,7 +210,9 @@ if __name__ == '__main__':
        exit(-1)

    logging.info("Getting compile targets")
    compileTargets = json.loads(Path('testing/asn/asn_compile_targets.json').read_text())
    compileTargets = json.loads(
        Path("testing/asn/asn_compile_targets.json").read_text()
    )
    logging.info(f"{len(compileTargets)} compile targets found")

    compileResults = compileAllTargets(compileTargets)
+25 −22
Original line number Diff line number Diff line
@@ -7,9 +7,8 @@ from colorama import Fore, Style

colorama.init()

ignore_paths = [Path(x) for x in [
    'testing/deps'
]]
ignore_paths = [Path(x) for x in ["testing/deps"]]


def print_colorized_diff_line(line: str):
    if line.startswith("-"):
@@ -19,8 +18,11 @@ def print_colorized_diff_line(line: str):
    else:
        print(line)


def lint(file: Path):
    completed = subprocess.run(["jq", ".", str(file)], capture_output=True, text=True, encoding="utf8")
    completed = subprocess.run(
        ["jq", ".", str(file)], capture_output=True, text=True, encoding="utf8"
    )

    if completed.returncode != 0:
        print(f"  {str(f)}: FAIL")
@@ -45,7 +47,6 @@ def lint(file : Path):
        return len(diff)



if __name__ == "__main__":
    root = Path("./")
    files = list(root.rglob("*.json"))
@@ -65,14 +66,16 @@ if __name__ == "__main__":
        files_with_errors += 1 if new_errors > 0 else 0

    print("───────────────────────────────────────────────────────────────────")
    print (f"Files: {len(files)} ({files_with_errors} with errors)  Total errors: {errors}")
    if (files_with_errors == 0):
    print(
        f"Files: {len(files)} ({files_with_errors} with errors)  Total errors: {errors}"
    )
    if files_with_errors == 0:
        print("✅ OK")
    else:
        print("❌ Fail")
    print("───────────────────────────────────────────────────────────────────")

    if (files_with_errors > 0):
    if files_with_errors > 0:
        exit(-1)
    else:
        exit(0)
+25 −22
Original line number Diff line number Diff line
@@ -7,9 +7,8 @@ from colorama import Fore, Style

colorama.init()

ignore_paths = [Path(x) for x in [
    'testing/deps'
]]
ignore_paths = [Path(x) for x in ["testing/deps"]]


def print_colorized_diff_line(line: str):
    if line.startswith("-"):
@@ -19,8 +18,11 @@ def print_colorized_diff_line(line: str):
    else:
        print(line)


def lint(file: Path):
    completed = subprocess.run(["xmllint", str(file)], capture_output=True, text=True, encoding="utf8")
    completed = subprocess.run(
        ["xmllint", str(file)], capture_output=True, text=True, encoding="utf8"
    )

    if completed.returncode != 0:
        print(f"  {str(f)}: FAIL")
@@ -43,7 +45,6 @@ def lint(file : Path):
        return len(diff)



if __name__ == "__main__":
    root = Path("./")
    files = list(root.rglob("*.xml")) + list(root.rglob("*.xsd"))
@@ -63,14 +64,16 @@ if __name__ == "__main__":
        files_with_errors += 1 if new_errors > 0 else 0

    print("───────────────────────────────────────────────────────────────────")
    print (f"Files: {len(files)} ({files_with_errors} with errors)  Total errors: {errors}")
    if (files_with_errors == 0):
    print(
        f"Files: {len(files)} ({files_with_errors} with errors)  Total errors: {errors}"
    )
    if files_with_errors == 0:
        print("✅ OK")
    else:
        print("❌ Fail")
    print("───────────────────────────────────────────────────────────────────")

    if (files_with_errors > 0):
    if files_with_errors > 0:
        exit(-1)
    else:
        exit(0)
+17 −4
Original line number Diff line number Diff line
@@ -11,8 +11,19 @@ import argparse

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)')
    parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
    parser.add_argument(
        "-v",
        "--verbose",
        action="count",
        help="Verbose logging (can be specified multiple times)",
    )
    parser.add_argument(
        "-i",
        "--input",
        type=argparse.FileType("r"),
        default=sys.stdin,
        help="Path to input file (if absent, stdin is used)",
    )
    args = parser.parse_args()

    match args.verbose:
@@ -31,5 +42,7 @@ if __name__ == "__main__":
    logging.debug(s)
    j = json.loads(s)

    xml = xmltodict.unparse({'HI1Message' : j}, )
    xml = xmltodict.unparse(
        {"HI1Message": j},
    )
    print(xml)
+139 −54
Original line number Diff line number Diff line
@@ -7,10 +7,11 @@ import logging
import argparse
from itertools import chain


class JsonValidator:
    def __init__(self, core_schema: str, other_schemas: dict):
        self._core_schema = json.load(Path(core_schema).open())
        self._schema_dict = { self._core_schema['$id'] : self._core_schema }
        self._schema_dict = {self._core_schema["$id"]: self._core_schema}
        self._supporting_paths = []
        for thing in other_schemas:
            path = Path(thing)
@@ -22,45 +23,78 @@ class JsonValidator:
                self._supporting_paths.append(path)
        logging.info(f"Supporting schema paths: {self._supporting_paths}")
        self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths]
        self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas }
        self._schema_dict = self._schema_dict | {
            s["$id"]: s for s in self._supporting_schemas
        }
        logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}")
        self._resolver = RefResolver(None, 
                        referrer=None, 
                        store=self._schema_dict)
        self._resolver = RefResolver(None, referrer=None, store=self._schema_dict)
        logging.info("Created RefResolver")
        self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver)
        self._validator = Draft202012Validator(
            self._core_schema, resolver=self._resolver
        )
        logging.info("Created validator")

    def validate(self, instance_doc: str):
        errors = list(self._validator.iter_errors(instance_doc))
        return errors


class TS103120Validator(JsonValidator):
    def __init__(self, path_to_repo):
        repo_path = Path(path_to_repo)
        schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")]
        core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json")
        JsonValidator.__init__(self, core_schema, schema_dirs)
        request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }  
        self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver)
        response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }  
        self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver)
        request_fragment_schema = {
            "$ref": "ts_103120_Core_2019_10#/$defs/RequestPayload"
        }
        self._request_fragment_validator = Draft202012Validator(
            request_fragment_schema, resolver=self._resolver
        )
        response_fragment_schema = {
            "$ref": "ts_103120_Core_2019_10#/$defs/ResponsePayload"
        }
        self._response_fragment_validator = Draft202012Validator(
            response_fragment_schema, resolver=self._resolver
        )

    def expand_request_response_exception(self, ex):
        if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
            logging.info ("Error detected validating payload oneOf - attempting explicit validation...")
            if 'RequestPayload' in instance_doc['Payload'].keys():
                ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])]))
        if list(ex.schema_path) == ["properties", "Payload", "oneOf"]:
            logging.info(
                "Error detected validating payload oneOf - attempting explicit validation..."
            )
            if "RequestPayload" in instance_doc["Payload"].keys():
                ret_list = list(
                    chain(
                        *[
                            self.expand_action_exception(x)
                            for x in self._request_fragment_validator.iter_errors(
                                instance_doc["Payload"]["RequestPayload"]
                            )
                        ]
                    )
                )
                for r in ret_list:
                    r.path = ex.path + r.path
                return ret_list
            elif 'ResponsePayload' in instance_doc['Payload'].keys():
                ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])]))
            elif "ResponsePayload" in instance_doc["Payload"].keys():
                ret_list = list(
                    chain(
                        *[
                            self.expand_action_exception(x)
                            for x in self._request_fragment_validator.iter_errors(
                                instance_doc["Payload"]["ResponsePayload"]
                            )
                        ]
                    )
                )
                for r in ret_list:
                    r.path = ex.path + r.path
                return ret_list
            else:
                logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
                logging.error(
                    "No RequestPayload or ResponsePayload found - is the Payload malformed?"
                )
                return [ex]
        else:
            return [ex]
@@ -68,15 +102,40 @@ class TS103120Validator (JsonValidator):
    def expand_action_exception(self, ex):
        logging.error("Error detected in ActionRequests/ActionResponses")
        error_path = list(ex.schema_path)
        if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
        if error_path != [
            "properties",
            "ActionRequests",
            "properties",
            "ActionRequest",
            "items",
            "allOf",
            1,
            "oneOf",
        ] and error_path != [
            "properties",
            "ActionResponses",
            "properties",
            "ActionResponse",
            "items",
            "allOf",
            1,
            "oneOf",
        ]:
            logging.error("Error not in inner Request/Response allOf/oneOf constraint")
            return [ex]
        j = ex.instance
        j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
        j.pop(
            "ActionIdentifier"
        )  # Remove ActionIdentifier - one remaining key will be the verb
        verb = list(j.keys())[0]
        message = "Request" if error_path[1] == "ActionRequests" else "Response"
        v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver)
        ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])]))
        v = Draft202012Validator(
            {"$ref": f"ts_103120_Core_2019_10#/$defs/{verb}{message}"},
            resolver=self._resolver,
        )
        ret_list = list(
            chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])])
        )
        for r in ret_list:
            r.path = ex.path + r.path
        return ret_list
@@ -84,40 +143,66 @@ class TS103120Validator (JsonValidator):
    def expand_object_exception(self, ex):
        logging.error("Error detected in verb")
        # The final level of validation is for the actual HI1Object validation
        if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']:
        if list(ex.schema_path) != ["properties", "HI1Object", "oneOf"]:
            logging.error("Error not inside HI1Object")
            return [ex]
        object_type = ex.instance['@xsi:type'].split('}')[-1]
        object_type = ex.instance["@xsi:type"].split("}")[-1]
        object_ref = {
            'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
            'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
            'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
            'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
            'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
            'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
            'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
            'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
            'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
            "AuthorisationObject": "ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject",
            "LITaskObject": "ts_103120_Task_2020_09#/$defs/LITaskObject",
            "LDTaskObject": "ts_103120_Task_2020_09#/$defs/LDTaskObject",
            "LPTaskObject": "ts_103120_Task_2020_09#/$defs/LPTaskObject",
            "DocumentObject": "ts_103120_Document_2020_09#/$defs/DocumentObject",
            "NotificationObject": "ts_103120_Notification_2016_02#/$defs/NotificationObject",
            "DeliveryObject": "ts_103120_Delivery_2019_10#/$defs/DeliveryObject",
            "TrafficPolicyObject": "ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject",
            "TrafficRuleObject": "ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject",
        }[object_type]
        v = Draft202012Validator({"$ref": object_ref}, resolver=self._resolver)
        return list(v.iter_errors(ex.instance))

    def validate(self, instance_doc: str):
        errors = JsonValidator.validate(self, instance_doc)
        out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors]))
        out_errors = list(
            chain(*[self.expand_request_response_exception(ex) for ex in errors])
        )
        return out_errors



if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
    parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
    parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
    parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document")
    parser.add_argument('--schema', default=None, help="Primary schema to validate against")    
    parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)")
    parser.add_argument(
        "-s",
        "--schemadir",
        action="append",
        help="Directory containing supporting schema files to use for validation",
    )
    parser.add_argument(
        "-v",
        "--verbose",
        action="count",
        help="Verbose logging (can be specified multiple times)",
    )
    parser.add_argument(
        "-i",
        "--input",
        type=argparse.FileType("r"),
        default=sys.stdin,
        help="Path to input file (if absent, stdin is used)",
    )
    parser.add_argument(
        "--ts103120", action="store_true", help="Validate a TS 103 120 JSON document"
    )
    parser.add_argument(
        "--schema", default=None, help="Primary schema to validate against"
    )
    parser.add_argument(
        "-p",
        "--printerror",
        action="count",
        help="Controls how verbose validation error printing is (can be specified multiple times)",
    )
    args = parser.parse_args()

    match args.verbose:
@@ -130,7 +215,7 @@ if __name__ == "__main__":

    logging.debug(f"Arguments: {args}")

    if (args.ts103120):
    if args.ts103120:
        v = TS103120Validator("./")
    else:
        v = JsonValidator(args.schema, args.schemadir)
Loading