From 2d68c59effa6f53744a49a2b84589157b5ef8441 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Tue, 25 Feb 2025 16:45:10 +0000 Subject: [PATCH 1/9] Initial round of Python formatting --- .gitlab-ci.yml | 6 + 103705/validate.py | 52 ++-- 103705/validation/validate_705.py | 89 ++++--- 103707/testing/validate_examples.py | 29 +-- create_attachments.py | 4 +- pyproject.toml | 2 + requirements.txt | 2 + testing/asn_process.py | 171 +++++++------ testing/merge_test.py | 52 ++-- testing/xsd_process.py | 193 +++++++++------ utils/json_to_xml.py | 21 +- utils/json_validator.py | 224 +++++++++++++----- utils/sign_json.py | 52 ++-- utils/translate/ChoiceMapping.py | 39 +-- utils/translate/ComplexTypeMapping.py | 5 +- utils/translate/SequenceMapping.py | 65 ++--- utils/translate/SimpleTypeMapping.py | 5 +- utils/translate/TypeMapping.py | 56 ++--- utils/translate/XSDNativeSimpleTypeMapping.py | 60 +++-- utils/translate/__init__.py | 31 +-- utils/translate_spec.py | 113 +++++---- utils/verify_json.py | 48 ++-- utils/xml_to_json.py | 151 ++++++------ 23 files changed, 892 insertions(+), 578 deletions(-) create mode 100644 pyproject.toml create mode 100644 requirements.txt diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4c0fc308..82eb0f9c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -19,6 +19,12 @@ preflight: script: - forgelib-preflight https://$CI_SERVER_HOST $CI_PROJECT_ID $CI_MERGE_REQUEST_IID +check_py: + image: "forge.etsi.org:5050/li/schemas-definitions/forgelib" + stage: check + script: + - ruff format --check . + process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" stage: check diff --git a/103705/validate.py b/103705/validate.py index fa549097..42c8935d 100644 --- a/103705/validate.py +++ b/103705/validate.py @@ -6,10 +6,9 @@ import logging import argparse - # filename = sys.argv[1] -# def load_json (path): +# def load_json (path): # with open(path) as f: # s = json.load(f) # return s @@ -38,20 +37,39 @@ import argparse # validate(json_instance, ext_schema) # print ("OK") + def handle_uri(u): print(u) -def load_json(path : str): + +def load_json(path: str): with open(path) as f: return json.load(f) + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") - parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") - parser.add_argument('schema', help="Primary schema to validate against") + parser.add_argument( + "-s", + "--schemadir", + action="append", + help="Directory containing supporting schema files to use for validation", + ) + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) + parser.add_argument("schema", help="Primary schema to validate against") args = parser.parse_args() @@ -67,29 +85,27 @@ if __name__ == "__main__": instance_doc = json.loads(args.input.read()) args.input.close() - main_schema = load_json(args.schema) - schema_dict = { main_schema['$id'] : main_schema } + main_schema = load_json(args.schema) + schema_dict = {main_schema["$id"]: main_schema} if args.schemadir: - schema_paths = [] + schema_paths = [] for d in args.schemadir: schema_paths += [f for f in Path(d).rglob("*.schema.json")] logging.info(f"Schema files loaded: {schema_paths}") schemas_json = [json.load(p.open()) for p in schema_paths] - schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } + schema_dict = schema_dict | {s["$id"]: s for s in schemas_json} logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") - logging.debug (f"Instance doc: {instance_doc}") - logging.debug (f"Main schema: {main_schema}") + logging.debug(f"Instance doc: {instance_doc}") + logging.debug(f"Main schema: {main_schema}") + + resolver = RefResolver(None, referrer=None, store=schema_dict) - resolver = RefResolver(None, - referrer=None, - store=schema_dict) - v = Draft202012Validator(main_schema, resolver=resolver) v.validate(instance_doc) - logging.info("Done") \ No newline at end of file + logging.info("Done") diff --git a/103705/validation/validate_705.py b/103705/validation/validate_705.py index bb6913d2..51314f63 100644 --- a/103705/validation/validate_705.py +++ b/103705/validation/validate_705.py @@ -5,10 +5,12 @@ from pathlib import Path import logging import argparse + def handle_uri(u): print(u) -def load_json(path : str): + +def load_json(path: str): with open(path) as f: return json.load(f) @@ -16,9 +18,25 @@ def load_json(path : str): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") - parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument( + "-s", + "--schemadir", + action="append", + help="Directory containing supporting schema files to use for validation", + ) + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) args = parser.parse_args() @@ -36,20 +54,22 @@ if __name__ == "__main__": args.input.close() config = { - 'schema_include_dirs' : [ - '../schema/', - '../../103280/', + "schema_include_dirs": [ + "../schema/", + "../../103280/", ], - 'main_schema_doc' : '../schema/response.schema.json' + "main_schema_doc": "../schema/response.schema.json", } rootPath = Path(sys.argv[0]).parent - main_schema = load_json(str(rootPath / config['main_schema_doc'])) - schema_dict = { main_schema['$id'] : main_schema } - - schema_paths = [] - for d in config['schema_include_dirs']: - schema_paths += [f for f in (rootPath / Path(d)).rglob("*.schema.json")] + main_schema = load_json(str(rootPath / config["main_schema_doc"])) + schema_dict = {main_schema["$id"]: main_schema} + + schema_paths = [] + for d in config["schema_include_dirs"]: + schema_paths += [ + f for f in (rootPath / Path(d)).rglob("*.schema.json") + ] logging.info(f"Core schema files loaded: {schema_paths}") if args.schemadir: for d in args.schemadir: @@ -58,40 +78,47 @@ if __name__ == "__main__": else: logging.info(f"No CSP schema files loaded") schemas_json = [json.load(p.open()) for p in schema_paths] - schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } + schema_dict = schema_dict | {s["$id"]: s for s in schemas_json} logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") - logging.debug (f"Instance doc: {instance_doc}") - logging.debug (f"Main schema: {main_schema}") + logging.debug(f"Instance doc: {instance_doc}") + logging.debug(f"Main schema: {main_schema}") + + resolver = RefResolver(None, referrer=None, store=schema_dict) - resolver = RefResolver(None, - referrer=None, - store=schema_dict) - logging.info("Performing ETSI validation") v = Draft202012Validator(main_schema, resolver=resolver) v.validate(instance_doc) logging.info("Building record type dictionary") - type_dict = instance_doc['recordSetDescription']['recordTypes'] + type_dict = instance_doc["recordSetDescription"]["recordTypes"] logging.debug(type_dict) - ref_dict = { k : {"$ref" : v} for k,v in type_dict.items()} - validator_dict = { k : Draft202012Validator(ref_dict[k], resolver=resolver) for k,v in ref_dict.items()} + ref_dict = {k: {"$ref": v} for k, v in type_dict.items()} + validator_dict = { + k: Draft202012Validator(ref_dict[k], resolver=resolver) + for k, v in ref_dict.items() + } logging.debug(ref_dict) logging.info("Validating records") - for r in instance_doc['recordSet']: - type_key = r['type'] + for r in instance_doc["recordSet"]: + type_key = r["type"] if type_key not in type_dict.keys(): - logging.error(f"Record {r['id']} has type {type_key}, not in recordType dict") + logging.error( + f"Record {r['id']} has type {type_key}, not in recordType dict" + ) type_ref = type_dict[type_key] - type_schema_id = type_ref.split('#')[0] - logging.info(f"Using {type_schema_id} to validate {type_ref} in record {r['id']}") + type_schema_id = type_ref.split("#")[0] + logging.info( + f"Using {type_schema_id} to validate {type_ref} in record {r['id']}" + ) if not (type_key in validator_dict.keys()): - logging.error(f'Type key {type_key} from type {type_ref} in record {r["id"]} not in validator dictionary') + logging.error( + f"Type key {type_key} from type {type_ref} in record {r['id']} not in validator dictionary" + ) print(ref_dict) v = validator_dict[type_key] v.validate(r) - logging.info("Done") \ No newline at end of file + logging.info("Done") diff --git a/103707/testing/validate_examples.py b/103707/testing/validate_examples.py index fbcf21f2..7cb29600 100644 --- a/103707/testing/validate_examples.py +++ b/103707/testing/validate_examples.py @@ -3,44 +3,41 @@ import sys from pathlib import Path from pprint import pprint -if __name__ == '__main__': - +if __name__ == "__main__": if sys.version_info <= (3, 5): - sys.exit('ERROR: You need at least Python 3.5 to run this tool') + sys.exit("ERROR: You need at least Python 3.5 to run this tool") try: from lxml import etree except ImportError: - sys.exit('ERROR: You need to install the Python lxml library') + sys.exit("ERROR: You need to install the Python lxml library") try: import xmlschema except ImportError: - sys.exit('ERROR: You need to install the xml schema library') - + sys.exit("ERROR: You need to install the xml schema library") - extraSchemas = [ - 'examples/FooServiceSchema.xsd', - 'TS_103_280_v020301.xsd' - ] + extraSchemas = ["examples/FooServiceSchema.xsd", "TS_103_280_v020301.xsd"] locations = [] for schemaFile in extraSchemas: - xs = xmlschema.XMLSchema(schemaFile, validation='skip') + xs = xmlschema.XMLSchema(schemaFile, validation="skip") locations.append((xs.default_namespace, str(Path(schemaFile)))) - coreSchema = xmlschema.XMLSchema('TS_103_707_v010201.xsd', locations=locations) + coreSchema = xmlschema.XMLSchema( + "TS_103_707_v010201.xsd", locations=locations + ) for schema in extraSchemas: newSchema = xmlschema.XMLSchema(schema) coreSchema.import_schema(newSchema.default_namespace, schema) - examples = glob.glob('examples/*.xml') + examples = glob.glob("examples/*.xml") for example in examples: try: coreSchema.validate(example) - print ("{0} passed validation".format(example)) + print("{0} passed validation".format(example)) except Exception as ex: - print ("{0} failed validation: {1}".format(example, ex)) + print("{0} failed validation: {1}".format(example, ex)) - print ('Done') \ No newline at end of file + print("Done") diff --git a/create_attachments.py b/create_attachments.py index 9f5f0264..e3a84108 100644 --- a/create_attachments.py +++ b/create_attachments.py @@ -46,7 +46,9 @@ def recursively_zip_directory(directory: Path, zipname: str, recursion=0): elif f.is_dir(): zipname = f.with_suffix(".zip").name logging.info(f"{'':{recursion * 4}}Adding archive: {f}") - recurse_buffer = recursively_zip_directory(f, zipname, recursion + 1) + recurse_buffer = recursively_zip_directory( + f, zipname, recursion + 1 + ) zip.writestr(zipname, recurse_buffer.getvalue()) return buffer diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..74a76e8a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[tool.ruff] +line-length = 79 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..1557a360 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +xmlschema==1.10.0 +ruff==0.9.7 diff --git a/testing/asn_process.py b/testing/asn_process.py index 8c4394cd..74209c55 100644 --- a/testing/asn_process.py +++ b/testing/asn_process.py @@ -6,26 +6,32 @@ from shutil import which from pycrate_asn1c.asnproc import * -def reconstrainInteger (filename): - Path('temp.asn').write_text(Path(filename).read_text().replace("18446744073709551615", "65536")) - return 'temp.asn' + +def reconstrainInteger(filename): + Path("temp.asn").write_text( + Path(filename).read_text().replace("18446744073709551615", "65536") + ) + return "temp.asn" + filesWithBigInts = [ - '102232-1/LI-PS-PDU.asn', - '102232-3/IPAccessPDU.asn', - '102232-4/L2AccessPDU.asn' + "102232-1/LI-PS-PDU.asn", + "102232-3/IPAccessPDU.asn", + "102232-4/L2AccessPDU.asn", ] asn1c_path = "" change_path_to_unix = False + def fix_path(path): if change_path_to_unix: - return "./" + path.replace("\\","/") + return "./" + path.replace("\\", "/") else: return path -def syntaxCheckASN (fileList): + +def syntaxCheckASN(fileList): """ Performs ASN syntax checking on a list of filenames (or pathlib Paths) @@ -40,56 +46,61 @@ def syntaxCheckASN (fileList): try: if file.as_posix() in filesWithBigInts: newFile = reconstrainInteger(str(file)) - p = run([asn1c_path, '-E', fix_path(newFile)], capture_output=True) + p = run( + [asn1c_path, "-E", fix_path(newFile)], capture_output=True + ) Path(newFile).unlink() else: - p = run([asn1c_path, '-E', fix_path(str(file))], capture_output=True) - if (p.returncode != 0): + p = run( + [asn1c_path, "-E", fix_path(str(file))], + capture_output=True, + ) + if p.returncode != 0: errorMessage = p.stderr.decode().splitlines()[0] - if errorMessage.startswith(' Value "18446744073709551615" at line'): - results[str(file)] = { 'ok' : True} + if errorMessage.startswith( + ' Value "18446744073709551615" at line' + ): + results[str(file)] = {"ok": True} continue results[str(file)] = { - 'ok' : False, - 'code' : p.returncode, - 'message' : p.stderr.decode().splitlines()[0] + "ok": False, + "code": p.returncode, + "message": p.stderr.decode().splitlines()[0], } else: - results[str(file)] = { - 'ok' : True - } + results[str(file)] = {"ok": True} except Exception as ex: raise ex results[str(file)] = { - 'ok' : False, - 'code' : -1, - 'message' : f"{ex!r}" + "ok": False, + "code": -1, + "message": f"{ex!r}", } return results duplicateObjects = { - '102232-1/LI-PS-PDU.asn' : [ - 'CCPayload', - 'IRIPayload', - 'Location' - ], - 'testing/mod1.asn' : [ - 'ClashField' - ] + "102232-1/LI-PS-PDU.asn": ["CCPayload", "IRIPayload", "Location"], + "testing/mod1.asn": ["ClashField"], } + + def fixDuplicateObjects(filename): stringContent = filename.read_text() for object in duplicateObjects[filename.as_posix()]: - stringContent = stringContent.replace(f'{object} ::=', f'Native{object} ::=') - stringContent = stringContent.replace(f'SEQUENCE OF {object}', f'SEQUENCE OF Native{object}') - #stringContent = sub(f"]\\w{object}", f"] Native{object}", stringContent) + stringContent = stringContent.replace( + f"{object} ::=", f"Native{object} ::=" + ) + stringContent = stringContent.replace( + f"SEQUENCE OF {object}", f"SEQUENCE OF Native{object}" + ) + # stringContent = sub(f"]\\w{object}", f"] Native{object}", stringContent) - Path('temp.asn').write_text(stringContent) - return 'temp.asn' + Path("temp.asn").write_text(stringContent) + return "temp.asn" -def compileAllTargets (compileTargets): +def compileAllTargets(compileTargets): """ Attempts to compile a set of compile targets using the pycrate ASN1 tools @@ -100,10 +111,10 @@ def compileAllTargets (compileTargets): to be the "primary" file. This doesn't have any relavance to the compilation, but will be used as the identifier when reporting any compile errors. The compilation is performed by the pycrate ASN compile functions; errors - are caught as exceptions and rendered into a list. - + are caught as exceptions and rendered into a list. + Unfortunately, the pycrate compiler doesn't report line numbers. - The asn1c compiler does, but doesn't properly handle identifiers with the + The asn1c compiler does, but doesn't properly handle identifiers with the same name in different modules; as this occurs multiple times in TS 33.108, we can't use it. """ @@ -120,107 +131,109 @@ def compileAllTargets (compileTargets): if pFile.as_posix() in duplicateObjects: tmpFile = Path(fixDuplicateObjects(pFile)) fileTexts.append(tmpFile.read_text()) - #tmpFile.unlink() + # tmpFile.unlink() else: fileTexts.append(pFile.read_text()) fileNames.append(filename) - logging.debug (f" Loading {filename}") - compile_text(fileTexts, filenames = fileNames) + logging.debug(f" Loading {filename}") + compile_text(fileTexts, filenames=fileNames) results[str(firstTarget)] = { - 'ok' : True, + "ok": True, } except Exception as ex: results[str(firstTarget)] = { - 'ok' : False, - 'code' : -1, - 'message' : f"{ex!r}" + "ok": False, + "code": -1, + "message": f"{ex!r}", } continue return results - -def processResults (results, stageName): +def processResults(results, stageName): """ Counts the number of errors and writes out the output per filename :param results: List of filenames (str or Pathlib Path) :param stageName: Name to decorate the output with :returns: The number of files which had errors - """ + """ print("") - errorCount = sum([1 for r in results.values() if not r['ok']]) + errorCount = sum([1 for r in results.values() if not r["ok"]]) logging.info(f"{errorCount} {stageName} errors encountered") - + print(f"{'-':-<60}") print(f"{stageName} results:") print(f"{'-':-<60}") for filename, result in results.items(): print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}") - if not result['ok']: - if isinstance(result['message'], list): - for thing in result['message']: + if not result["ok"]: + if isinstance(result["message"], list): + for thing in result["message"]: print(f" {thing['message']}") else: print(f" {result['message']}") - + print(f"{'-':-<60}") print(f"{stageName} errors: {errorCount}") print(f"{'-':-<60}") - + return errorCount -if __name__ == '__main__': - logging.info ('Searching for ASN1C') +if __name__ == "__main__": + logging.info("Searching for ASN1C") asn1c_path = which("asn1c") if asn1c_path is None: - raise Exception ("No asn1c executable found. Please install asn1c") - logging.info (f"asn1c found at {asn1c_path}") + raise Exception("No asn1c executable found. Please install asn1c") + logging.info(f"asn1c found at {asn1c_path}") if asn1c_path.lower().endswith("bat"): - logging.info (f"asn1c is a batch file, so assume path separators need to be changed") + logging.info( + f"asn1c is a batch file, so assume path separators need to be changed" + ) change_path_to_unix = True - - logging.info('Searching for ASN.1 files') + logging.info("Searching for ASN.1 files") fileList = list(Path(".").rglob("*.asn1")) + list(Path(".").rglob("*.asn")) - logging.info(f'{len(fileList)} ASN.1 files found') + logging.info(f"{len(fileList)} ASN.1 files found") for file in fileList: - logging.debug(f' {file}') - - ignoreList = Path('testing/asn_ignore.txt').read_text().splitlines() + logging.debug(f" {file}") + + ignoreList = Path("testing/asn_ignore.txt").read_text().splitlines() ignoredFiles = [] for ignore in ignoreList: - logging.debug(f'Ignoring pattern {ignore}') + logging.debug(f"Ignoring pattern {ignore}") for file in fileList: if ignore in str(file): ignoredFiles.append(file) logging.debug(f" Ignoring {str(file)} as contains {ignore}") ignoredFiles = list(set(ignoredFiles)) - logging.info(f'{len(ignoredFiles)} files ignored') + logging.info(f"{len(ignoredFiles)} files ignored") for file in ignoredFiles: - logging.debug(f' {file}') - + logging.debug(f" {file}") + fileList = [file for file in fileList if file not in ignoredFiles] - logging.info(f'{len(fileList)} files to process') + logging.info(f"{len(fileList)} files to process") for file in fileList: - logging.debug(f' {file}') + logging.debug(f" {file}") if len(fileList) == 0: - logging.warning ("No files specified") + logging.warning("No files specified") exit(0) - + logging.info("Parsing ASN1 files") parseResults = syntaxCheckASN(fileList) if processResults(parseResults, "Parsing") > 0: exit(-1) - logging.info ("Getting compile targets") - compileTargets = json.loads(Path('testing/asn_compile_targets.json').read_text()) - logging.info (f"{len(compileTargets)} compile targets found") + logging.info("Getting compile targets") + compileTargets = json.loads( + Path("testing/asn_compile_targets.json").read_text() + ) + logging.info(f"{len(compileTargets)} compile targets found") compileResults = compileAllTargets(compileTargets) if processResults(compileResults, "Compiling") > 0: exit(-1) - + exit(0) diff --git a/testing/merge_test.py b/testing/merge_test.py index b7a82b39..da9a9025 100644 --- a/testing/merge_test.py +++ b/testing/merge_test.py @@ -8,61 +8,69 @@ crCommitBranch = os.environ.get("CI_COMMIT_REF_NAME", "NOTFOUND") apiUrl = os.environ.get("CI_API_V4_URL", "https://forge.3gpp.org/rep/api/v4") projectId = os.environ.get("CI_PROJECT_ID", "13") -def gapi (query): + +def gapi(query): url = f"{apiUrl}/projects/{projectId}/{query}" r = requests.get(url) return json.loads(r.text) -def do (commandline): - #print (" Attempting: " + commandline) - completedProc = subprocess.run(commandline, capture_output=True, shell=True) - #print (" STDOUT > " + ("empty" if completedProc.stdout is None else completedProc.stdout.decode('utf-8'))) - #print (" STDERR > " + ("empty" if completedProc.stderr is None else completedProc.stderr.decode('utf-8'))) - #print (f" Completed with code {completedProc.returncode}") - return (completedProc.returncode == 0, completedProc.stdout.decode('utf-8')) -print ("Searching for corresponding MR...") +def do(commandline): + # print (" Attempting: " + commandline) + completedProc = subprocess.run( + commandline, capture_output=True, shell=True + ) + # print (" STDOUT > " + ("empty" if completedProc.stdout is None else completedProc.stdout.decode('utf-8'))) + # print (" STDERR > " + ("empty" if completedProc.stderr is None else completedProc.stderr.decode('utf-8'))) + # print (f" Completed with code {completedProc.returncode}") + return ( + completedProc.returncode == 0, + completedProc.stdout.decode("utf-8"), + ) + + +print("Searching for corresponding MR...") mrs = gapi(f"merge_requests?source_branch={crCommitBranch}&state=opened") if len(mrs) == 0: - print ("No MR found... aborting") + print("No MR found... aborting") exit() if len(mrs) > 1: - print (f"{len(mrs)} MRs found, 1 expected - aborting") + print(f"{len(mrs)} MRs found, 1 expected - aborting") for m in mrs: pprint.pprint(m) exit(-1) mr = mrs[0] -print (f"Found MR {mr['reference']} ({mr['title']})") -print (f"Target branch is {mr['target_branch']}") -print ("Searching for open MRs targeting same branch...") +print(f"Found MR {mr['reference']} ({mr['title']})") +print(f"Target branch is {mr['target_branch']}") +print("Searching for open MRs targeting same branch...") mrs = gapi(f"merge_requests?target_branch={mr['target_branch']}&state=opened") -mrs = [m for m in mrs if m['reference'] != mr['reference']] -print (f"{len(mrs)} MRs found") +mrs = [m for m in mrs if m["reference"] != mr["reference"]] +print(f"{len(mrs)} MRs found") mergeConflicts = {} for mr in mrs: - source_branch = mr['source_branch'] - print (source_branch) + source_branch = mr["source_branch"] + print(source_branch) try: do(f"git fetch origin {source_branch}:{source_branch}") success, errStr = do(f"git merge --no-commit {source_branch}") if not success: - print ("Merge NOT OK") + print("Merge NOT OK") mergeConflicts[source_branch] = errStr else: - print ("Merge OK") + print("Merge OK") except Exception as ex: mergeConflicts[source_branch] = str(ex) raise finally: do("git merge --abort") -print (f"Merge conflicts with following branches: {mergeConflicts}") -exit(len(mergeConflicts.keys())) \ No newline at end of file +print(f"Merge conflicts with following branches: {mergeConflicts}") +exit(len(mergeConflicts.keys())) diff --git a/testing/xsd_process.py b/testing/xsd_process.py index 97dc2a88..54237781 100644 --- a/testing/xsd_process.py +++ b/testing/xsd_process.py @@ -6,7 +6,7 @@ from xmlschema import etree_tostring from xmlschema import XMLSchema, XMLSchemaParseError -def BuildSchemaDictonary (fileList): +def BuildSchemaDictonary(fileList): if len(fileList) == 0: logging.info("No schema files provided") return [] @@ -15,15 +15,21 @@ def BuildSchemaDictonary (fileList): schemaLocations = [] for schemaFile in fileList: try: - xs = XMLSchema(schemaFile, validation='skip') - schemaLocations.append((xs.default_namespace, str(Path(schemaFile).resolve()))) - logging.info(" [ {0} -> {1} ]".format(xs.default_namespace, schemaFile)) + xs = XMLSchema(schemaFile, validation="skip") + schemaLocations.append( + (xs.default_namespace, str(Path(schemaFile).resolve())) + ) + logging.info( + " [ {0} -> {1} ]".format(xs.default_namespace, schemaFile) + ) except XMLSchemaParseError as ex: - logging.warning (" [ {0} failed to parse: {1} ]".format(schemaFile, ex)) + logging.warning( + " [ {0} failed to parse: {1} ]".format(schemaFile, ex) + ) return schemaLocations -def BuildSchema (coreFile, fileList = None): +def BuildSchema(coreFile, fileList=None): schemaLocations = [] if fileList and len(fileList) > 0: schemaLocations = BuildSchemaDictonary(fileList) @@ -32,38 +38,47 @@ def BuildSchema (coreFile, fileList = None): return coreSchema -def ValidateXSDFiles (fileList): +def ValidateXSDFiles(fileList): if len(fileList) == 0: logging.info("No schema files provided") return {} - + schemaLocations = BuildSchemaDictonary(fileList) errors = {} logging.info("Schema validation:") for schemaFile in fileList: try: - schema = XMLSchema(schemaFile, locations = schemaLocations, validation="lax") + schema = XMLSchema( + schemaFile, locations=schemaLocations, validation="lax" + ) logging.info(schemaFile + ": OK") - errors[schemaFile] = [f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" for e in schema.all_errors] + errors[schemaFile] = [ + f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" + for e in schema.all_errors + ] except XMLSchemaParseError as ex: - logging.warning(schemaFile + ": Failed validation ({0})".format(ex.message)) + logging.warning( + schemaFile + ": Failed validation ({0})".format(ex.message) + ) if (ex.schema_url) and (ex.schema_url != ex.origin_url): - logging.warning(" Error comes from {0}, suppressing".format(ex.schema_url)) - errors[schemaFile] = [] + logging.warning( + " Error comes from {0}, suppressing".format(ex.schema_url) + ) + errors[schemaFile] = [] else: errors[schemaFile] = [ex] return errors -def ValidateAllXSDFilesInPath (path): +def ValidateAllXSDFilesInPath(path): schemaGlob = [str(f) for f in Path(path).rglob("*.xsd")] return ValidateXSDFiles(schemaGlob) -def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs): +def ValidateInstanceDocuments(coreFile, supportingSchemas, instanceDocs): if (instanceDocs is None) or len(instanceDocs) == 0: - logging.warning ("No instance documents provided") + logging.warning("No instance documents provided") return [] schema = BuildSchema(coreFile, supportingSchemas) @@ -71,44 +86,44 @@ def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs): for instanceDoc in instanceDocs: try: schema.validate(instanceDoc) - logging.info ("{0} passed validation".format(instanceDoc)) + logging.info("{0} passed validation".format(instanceDoc)) except Exception as ex: - logging.error ("{0} failed validation: {1}".format(instanceDoc, ex)) + logging.error("{0} failed validation: {1}".format(instanceDoc, ex)) return errors -def processResults (results, stageName): +def processResults(results, stageName): """ Counts the number of errors and writes out the output per filename :param results: List of filenames (str or Pathlib Path) :param stageName: Name to decorate the output with :returns: The number of files which had errors - """ + """ print("") - errorCount = sum([1 for r in results.values() if not r['ok']]) + errorCount = sum([1 for r in results.values() if not r["ok"]]) logging.info(f"{errorCount} {stageName} errors encountered") - + print(f"{'-':-<60}") print(f"{stageName} results:") print(f"{'-':-<60}") for filename, result in results.items(): print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}") - if not result['ok']: - if isinstance(result['message'], list): - for thing in result['message']: + if not result["ok"]: + if isinstance(result["message"], list): + for thing in result["message"]: print(f" {thing['message']}") else: print(f" {result['message']}") - + print(f"{'-':-<60}") print(f"{stageName} errors: {errorCount}") print(f"{'-':-<60}") - + return errorCount -def syntaxCheckXSD (fileList): +def syntaxCheckXSD(fileList): results = {} for file in fileList: try: @@ -116,68 +131,91 @@ def syntaxCheckXSD (fileList): schema = XMLSchema(str(file), validation="skip") results[str(file)] = { - 'ok' : len(schema.all_errors) == 0, - 'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}"} for e in schema.all_errors] + "ok": len(schema.all_errors) == 0, + "message": None + if len(schema.all_errors) == 0 + else [ + { + "message": f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" + } + for e in schema.all_errors + ], } except XMLSchemaParseError as ex: - logging.warning(str(file) + ": Failed validation ({0})".format(ex.message)) - results[str(file)] = { - 'ok' : False, - 'message' : f"{ex!r}" - } + logging.warning( + str(file) + ": Failed validation ({0})".format(ex.message) + ) + results[str(file)] = {"ok": False, "message": f"{ex!r}"} return results -if __name__ == '__main__': - #logging.basicConfig(level=logging.DEBUG) +if __name__ == "__main__": + # logging.basicConfig(level=logging.DEBUG) - compileTargets = json.loads(Path('testing/xsd_compile_targets.json').read_text()) + compileTargets = json.loads( + Path("testing/xsd_compile_targets.json").read_text() + ) results = {} for target in compileTargets: - coreFile = target['coreSchema'] + coreFile = target["coreSchema"] logging.info(f"Attempting to compile {coreFile}") schemaLocations = [] - for supportSchema in target['supportingSchemas']: + for supportSchema in target["supportingSchemas"]: logging.debug(f"Adding supporting schema {supportSchema}") try: - xs = XMLSchema(supportSchema, validation='skip') - schemaLocations.append((xs.target_namespace, str(Path(supportSchema).resolve()))) - logging.info(" [ {0} -> {1} ]".format(xs.default_namespace, supportSchema)) + xs = XMLSchema(supportSchema, validation="skip") + schemaLocations.append( + (xs.target_namespace, str(Path(supportSchema).resolve())) + ) + logging.info( + " [ {0} -> {1} ]".format( + xs.default_namespace, supportSchema + ) + ) except Exception as ex: - logging.warning (" [ {0} exception parsing: {1} ]".format(supportSchema, ex)) - results[coreFile] = { - 'ok' : False, - 'message' : f"{ex!r}" - } + logging.warning( + " [ {0} exception parsing: {1} ]".format( + supportSchema, ex + ) + ) + results[coreFile] = {"ok": False, "message": f"{ex!r}"} break try: - schema = XMLSchema(coreFile, locations = schemaLocations, validation="strict") + schema = XMLSchema( + coreFile, locations=schemaLocations, validation="strict" + ) results[coreFile] = { - 'ok' : len(schema.all_errors) == 0, - 'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}"} for e in schema.all_errors] + "ok": len(schema.all_errors) == 0, + "message": None + if len(schema.all_errors) == 0 + else [ + { + "message": f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" + } + for e in schema.all_errors + ], } target["schemaInstance"] = schema except Exception as ex: - results[coreFile] = { - 'ok' : False, - 'message' : f"{ex!r}" - } + results[coreFile] = {"ok": False, "message": f"{ex!r}"} continue - - if (processResults(results, "Compile") > 0): + + if processResults(results, "Compile") > 0: exit(-1) - + results = {} for target in compileTargets: schema = target["schemaInstance"] testResults = {} failureCount = 0 - logging.info (f"Validating example {len(target['exampleFiles'])} entries for {target['coreSchema']}") + logging.info( + f"Validating example {len(target['exampleFiles'])} entries for {target['coreSchema']}" + ) for example in target["exampleFiles"]: examplePath = Path(example) if examplePath.is_dir(): - logging.debug (f"Expanding {str(examplePath)}") + logging.debug(f"Expanding {str(examplePath)}") testFiles = list(examplePath.rglob("./*.xml")) else: testFiles = [examplePath] @@ -186,32 +224,39 @@ if __name__ == '__main__': logging.debug(f"Validating {str(test)} against schema") try: errors = list(schema.iter_errors(str(test))) - testResults[test] = [f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" for e in errors] + testResults[test] = [ + f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" + for e in errors + ] failureCount += len(errors) except Exception as ex: - testResults[test] = [f"{ex!r}"] + testResults[test] = [f"{ex!r}"] failureCount += 1 - results[target['coreSchema']] = { - 'ok' : failureCount == 0, - 'testResults' : testResults, - 'failureCount' : failureCount + results[target["coreSchema"]] = { + "ok": failureCount == 0, + "testResults": testResults, + "failureCount": failureCount, } - + print(f"{'-':-<75}") print(f"Validation results:") print(f"{'-':-<75}") totalErrors = 0 for filename, result in results.items(): - if len(result['testResults']) == 0: - print (f"{filename:.<70}SKIP (0)") + if len(result["testResults"]) == 0: + print(f"{filename:.<70}SKIP (0)") continue else: - print (f"{filename:.<70}{'..OK' if result['ok'] else 'FAIL'} ({len(result['testResults'])})") - totalErrors += result['failureCount'] - if result['failureCount'] > 0: - for testFile, testResult in result['testResults'].items(): - print(f" {str(testFile):.<65}{'..OK' if len(testResult) == 0 else 'FAIL'}") + print( + f"{filename:.<70}{'..OK' if result['ok'] else 'FAIL'} ({len(result['testResults'])})" + ) + totalErrors += result["failureCount"] + if result["failureCount"] > 0: + for testFile, testResult in result["testResults"].items(): + print( + f" {str(testFile):.<65}{'..OK' if len(testResult) == 0 else 'FAIL'}" + ) for tr in testResult: print(f" {tr}") diff --git a/utils/json_to_xml.py b/utils/json_to_xml.py index 17764a7a..3f03901c 100644 --- a/utils/json_to_xml.py +++ b/utils/json_to_xml.py @@ -11,8 +11,19 @@ import argparse if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) args = parser.parse_args() match args.verbose: @@ -31,5 +42,7 @@ if __name__ == "__main__": logging.debug(s) j = json.loads(s) - xml = xmltodict.unparse({'HI1Message' : j}, ) - print(xml) \ No newline at end of file + xml = xmltodict.unparse( + {"HI1Message": j}, + ) + print(xml) diff --git a/utils/json_validator.py b/utils/json_validator.py index e9ff0c03..972ace08 100644 --- a/utils/json_validator.py +++ b/utils/json_validator.py @@ -7,10 +7,11 @@ import logging import argparse from itertools import chain + class JsonValidator: - def __init__(self, core_schema: str, other_schemas : dict): + def __init__(self, core_schema: str, other_schemas: dict): self._core_schema = json.load(Path(core_schema).open()) - self._schema_dict = { self._core_schema['$id'] : self._core_schema } + self._schema_dict = {self._core_schema["$id"]: self._core_schema} self._supporting_paths = [] for thing in other_schemas: path = Path(thing) @@ -21,103 +22,208 @@ class JsonValidator: logging.debug(f"Appending {path} as schema file") self._supporting_paths.append(path) logging.info(f"Supporting schema paths: {self._supporting_paths}") - self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths] - self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas } - logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}") - self._resolver = RefResolver(None, - referrer=None, - store=self._schema_dict) + self._supporting_schemas = [ + json.load(p.open()) for p in self._supporting_paths + ] + self._schema_dict = self._schema_dict | { + s["$id"]: s for s in self._supporting_schemas + } + logging.info( + f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}" + ) + self._resolver = RefResolver( + None, referrer=None, store=self._schema_dict + ) logging.info("Created RefResolver") - self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver) + self._validator = Draft202012Validator( + self._core_schema, resolver=self._resolver + ) logging.info("Created validator") def validate(self, instance_doc: str): errors = list(self._validator.iter_errors(instance_doc)) return errors - -class TS103120Validator (JsonValidator): - def __init__ (self, path_to_repo): + + +class TS103120Validator(JsonValidator): + def __init__(self, path_to_repo): repo_path = Path(path_to_repo) schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")] - core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json") + core_schema = str( + repo_path / "103120/schema/json/ts_103120_Core.schema.json" + ) JsonValidator.__init__(self, core_schema, schema_dirs) - request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } - self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver) - response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } - self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver) - - def expand_request_response_exception (self, ex): - if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: - logging.info ("Error detected validating payload oneOf - attempting explicit validation...") - if 'RequestPayload' in instance_doc['Payload'].keys(): - ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])])) + request_fragment_schema = { + "$ref": "ts_103120_Core_2019_10#/$defs/RequestPayload" + } + self._request_fragment_validator = Draft202012Validator( + request_fragment_schema, resolver=self._resolver + ) + response_fragment_schema = { + "$ref": "ts_103120_Core_2019_10#/$defs/ResponsePayload" + } + self._response_fragment_validator = Draft202012Validator( + response_fragment_schema, resolver=self._resolver + ) + + def expand_request_response_exception(self, ex): + if list(ex.schema_path) == ["properties", "Payload", "oneOf"]: + logging.info( + "Error detected validating payload oneOf - attempting explicit validation..." + ) + if "RequestPayload" in instance_doc["Payload"].keys(): + ret_list = list( + chain( + *[ + self.expand_action_exception(x) + for x in self._request_fragment_validator.iter_errors( + instance_doc["Payload"]["RequestPayload"] + ) + ] + ) + ) for r in ret_list: r.path = ex.path + r.path return ret_list - elif 'ResponsePayload' in instance_doc['Payload'].keys(): - ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])])) + elif "ResponsePayload" in instance_doc["Payload"].keys(): + ret_list = list( + chain( + *[ + self.expand_action_exception(x) + for x in self._request_fragment_validator.iter_errors( + instance_doc["Payload"]["ResponsePayload"] + ) + ] + ) + ) for r in ret_list: r.path = ex.path + r.path return ret_list else: - logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") + logging.error( + "No RequestPayload or ResponsePayload found - is the Payload malformed?" + ) return [ex] else: return [ex] - - def expand_action_exception (self, ex): + + def expand_action_exception(self, ex): logging.error("Error detected in ActionRequests/ActionResponses") error_path = list(ex.schema_path) - if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: - logging.error("Error not in inner Request/Response allOf/oneOf constraint") - return[ex] + if error_path != [ + "properties", + "ActionRequests", + "properties", + "ActionRequest", + "items", + "allOf", + 1, + "oneOf", + ] and error_path != [ + "properties", + "ActionResponses", + "properties", + "ActionResponse", + "items", + "allOf", + 1, + "oneOf", + ]: + logging.error( + "Error not in inner Request/Response allOf/oneOf constraint" + ) + return [ex] j = ex.instance - j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb + j.pop( + "ActionIdentifier" + ) # Remove ActionIdentifier - one remaining key will be the verb verb = list(j.keys())[0] - message = "Request" if error_path[1] == "ActionRequests" else "Response" - v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver) - ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])])) + message = ( + "Request" if error_path[1] == "ActionRequests" else "Response" + ) + v = Draft202012Validator( + {"$ref": f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, + resolver=self._resolver, + ) + ret_list = list( + chain( + *[ + self.expand_object_exception(x) + for x in v.iter_errors(j[verb]) + ] + ) + ) for r in ret_list: r.path = ex.path + r.path return ret_list - - def expand_object_exception (self, ex): + + def expand_object_exception(self, ex): logging.error("Error detected in verb") # The final level of validation is for the actual HI1Object validation - if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']: + if list(ex.schema_path) != ["properties", "HI1Object", "oneOf"]: logging.error("Error not inside HI1Object") return [ex] - object_type = ex.instance['@xsi:type'].split('}')[-1] + object_type = ex.instance["@xsi:type"].split("}")[-1] object_ref = { - 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', - 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', - 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', - 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', - 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', - 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', - 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', - 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', - 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', + "AuthorisationObject": "ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject", + "LITaskObject": "ts_103120_Task_2020_09#/$defs/LITaskObject", + "LDTaskObject": "ts_103120_Task_2020_09#/$defs/LDTaskObject", + "LPTaskObject": "ts_103120_Task_2020_09#/$defs/LPTaskObject", + "DocumentObject": "ts_103120_Document_2020_09#/$defs/DocumentObject", + "NotificationObject": "ts_103120_Notification_2016_02#/$defs/NotificationObject", + "DeliveryObject": "ts_103120_Delivery_2019_10#/$defs/DeliveryObject", + "TrafficPolicyObject": "ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject", + "TrafficRuleObject": "ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject", }[object_type] - v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver) + v = Draft202012Validator({"$ref": object_ref}, resolver=self._resolver) return list(v.iter_errors(ex.instance)) - + def validate(self, instance_doc: str): errors = JsonValidator.validate(self, instance_doc) - out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors])) + out_errors = list( + chain( + *[self.expand_request_response_exception(ex) for ex in errors] + ) + ) return out_errors - if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") - parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") - parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document") - parser.add_argument('--schema', default=None, help="Primary schema to validate against") - parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)") + parser.add_argument( + "-s", + "--schemadir", + action="append", + help="Directory containing supporting schema files to use for validation", + ) + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) + parser.add_argument( + "--ts103120", + action="store_true", + help="Validate a TS 103 120 JSON document", + ) + parser.add_argument( + "--schema", default=None, help="Primary schema to validate against" + ) + parser.add_argument( + "-p", + "--printerror", + action="count", + help="Controls how verbose validation error printing is (can be specified multiple times)", + ) args = parser.parse_args() match args.verbose: @@ -130,7 +236,7 @@ if __name__ == "__main__": logging.debug(f"Arguments: {args}") - if (args.ts103120): + if args.ts103120: v = TS103120Validator("./") else: v = JsonValidator(args.schema, args.schemadir) diff --git a/utils/sign_json.py b/utils/sign_json.py index 1ce0bba5..96439d46 100644 --- a/utils/sign_json.py +++ b/utils/sign_json.py @@ -1,4 +1,3 @@ - import argparse import logging import sys @@ -8,18 +7,31 @@ from pathlib import Path import json -def insert_sig_block (j): - j['Signature'] = { - 'protected' : '', - 'signature' : '' - } +def insert_sig_block(j): + j["Signature"] = {"protected": "", "signature": ""} return j + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') - parser.add_argument('--pretty', action="store_true", help='Pretty-print the JSON document before signing') - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "--pretty", + action="store_true", + help="Pretty-print the JSON document before signing", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) args = parser.parse_args() match args.verbose: @@ -34,24 +46,22 @@ if __name__ == "__main__": json_text = args.input.read() args.input.close() - + j = json.loads(json_text) j = insert_sig_block(j) - + indent = None if args.pretty: - indent = ' ' + indent = " " presigned_json_text = json.dumps(j, indent=indent) - Path('presigned.json').write_text(presigned_json_text) - presigned_json_bytes = presigned_json_text.encode('utf-8') - - signed = jws.sign(presigned_json_bytes, 'secret_key', algorithm="HS256") - components = signed.split('.') + Path("presigned.json").write_text(presigned_json_text) + presigned_json_bytes = presigned_json_text.encode("utf-8") + + signed = jws.sign(presigned_json_bytes, "secret_key", algorithm="HS256") + components = signed.split(".") - j['Signature']['protected'] = components[0] - j['Signature']['signature'] = components[2] + j["Signature"]["protected"] = components[0] + j["Signature"]["signature"] = components[2] signed_json_text = json.dumps(j, indent=indent) print(signed_json_text) - - diff --git a/utils/translate/ChoiceMapping.py b/utils/translate/ChoiceMapping.py index b477a336..22699dd0 100644 --- a/utils/translate/ChoiceMapping.py +++ b/utils/translate/ChoiceMapping.py @@ -10,38 +10,45 @@ from .ComplexTypeMapping import ComplexTypeMapping log = logging.getLogger() + class ChoiceMapping(ComplexTypeMapping): @classmethod - def process_choice(cls, choice: XsdGroup, current_ns : str, ns_to_id_map): - if choice.model != 'choice': + def process_choice(cls, choice: XsdGroup, current_ns: str, ns_to_id_map): + if choice.model != "choice": raise Exception(f"Wrong group type: {c.model}") oneOf = [] for c in choice.iter_model(): if not (type(c) is XsdElement): - raise Exception (f"Non-element {c} encountered in choice {choice}") + raise Exception( + f"Non-element {c} encountered in choice {choice}" + ) element_name = c.local_name if c.target_namespace in ns_to_id_map: ns = ns_to_id_map[c.target_namespace] - if 'prefix' in ns: - element_name = ns['prefix'] + ":" + element_name + if "prefix" in ns: + element_name = ns["prefix"] + ":" + element_name t = TypeMapping.get_type_from_elem(c, current_ns) - oneOf.append({ - "type" : "object", - "properties" : { - element_name : t - }, - "required" : [element_name] - }) - return oneOf + oneOf.append( + { + "type": "object", + "properties": {element_name: t}, + "required": [element_name], + } + ) + return oneOf - def map(self, xst : BaseXsdType): + def map(self, xst: BaseXsdType): log.debug(f"Attempting mapping of {xst} to choice") j = super().map(xst) if j is None: log.debug("Not a complex type, giving up") return None content = xst.content - if (content.model != 'choice'): + if content.model != "choice": log.debug("Not a choice, giving up") return None - return { 'oneOf' : ChoiceMapping.process_choice(content, xst.namespaces[''], self.ns_to_id_map)} + return { + "oneOf": ChoiceMapping.process_choice( + content, xst.namespaces[""], self.ns_to_id_map + ) + } diff --git a/utils/translate/ComplexTypeMapping.py b/utils/translate/ComplexTypeMapping.py index e1819090..19064e2d 100644 --- a/utils/translate/ComplexTypeMapping.py +++ b/utils/translate/ComplexTypeMapping.py @@ -2,10 +2,9 @@ from xmlschema.validators.complex_types import * from .TypeMapping import TypeMapping + class ComplexTypeMapping(TypeMapping): def map(self, xst: BaseXsdType): if not (type(xst) is XsdComplexType): return None - return { - "type" : "object" - } + return {"type": "object"} diff --git a/utils/translate/SequenceMapping.py b/utils/translate/SequenceMapping.py index 341ac514..96e009b2 100644 --- a/utils/translate/SequenceMapping.py +++ b/utils/translate/SequenceMapping.py @@ -20,24 +20,20 @@ class SequenceMapping(ComplexTypeMapping): log.debug("Not a complex type, giving up") return None content = xst.content - if (content.model != 'sequence'): + if content.model != "sequence": log.debug("Not a sequence, giving up") return None - mapped_type = { - 'type' : 'object', - 'properties' : {}, - 'required' : [] - } + mapped_type = {"type": "object", "properties": {}, "required": []} # Not going to try and do all of this automatically for now # Only make insert the xsiType parameter - if (xst.base_type): + if xst.base_type: # mapped_type['__DESCENDENT_OF__'] = TypeMapping.get_ref_for(xst.base_type, xst.namespaces['']) - mapped_type['properties']['@xsi:type'] = { - "type" : "string", - "enum" : [xst.name] + mapped_type["properties"]["@xsi:type"] = { + "type": "string", + "enum": [xst.name], } - mapped_type['required'].append('@xsi:type') + mapped_type["required"].append("@xsi:type") # if xst.abstract: # mapped_type['__ABSTRACT__'] = True # pass @@ -49,37 +45,46 @@ class SequenceMapping(ComplexTypeMapping): element_name = c.local_name if c.target_namespace in self.ns_to_id_map: ns = self.ns_to_id_map[c.target_namespace] - if 'prefix' in ns: - element_name = ns['prefix'] + ":" + element_name + if "prefix" in ns: + element_name = ns["prefix"] + ":" + element_name if c.effective_max_occurs != 1: - mapped_type['properties'][element_name] = { - "type" : "array", - "items" : TypeMapping.get_type_from_elem(c, xst.namespaces['']) + mapped_type["properties"][element_name] = { + "type": "array", + "items": TypeMapping.get_type_from_elem( + c, xst.namespaces[""] + ), } if c.effective_max_occurs: - mapped_type['properties'][element_name]['maxItems'] = c.effective_max_occurs + mapped_type["properties"][element_name]["maxItems"] = ( + c.effective_max_occurs + ) if c.effective_min_occurs > 0: - mapped_type['properties'][element_name]['minItems'] = c.effective_min_occurs + mapped_type["properties"][element_name]["minItems"] = ( + c.effective_min_occurs + ) else: - mapped_type['properties'][element_name] = TypeMapping.get_type_from_elem(c, xst.namespaces['']) + mapped_type["properties"][element_name] = ( + TypeMapping.get_type_from_elem(c, xst.namespaces[""]) + ) if c.effective_min_occurs == 1: - mapped_type['required'].append(element_name) + mapped_type["required"].append(element_name) elif type(c) is XsdGroup: if inner_choice: - raise Exception (f"Second group '{element_name}' encountered in {xst}") + raise Exception( + f"Second group '{element_name}' encountered in {xst}" + ) if c.model != "choice": - raise Exception (f"Don't know what to do with inner group {c} in {xst} - not a choice") - inner_choice = ChoiceMapping.process_choice(c, xst.namespaces[''], self.ns_to_id_map) + raise Exception( + f"Don't know what to do with inner group {c} in {xst} - not a choice" + ) + inner_choice = ChoiceMapping.process_choice( + c, xst.namespaces[""], self.ns_to_id_map + ) elif type(c) is XsdAnyElement: mapped_type = {} else: raise Exception(f"Unknown element type {c}") - if (inner_choice): - return { - 'allOf' : [ - mapped_type, - {'oneOf' : inner_choice} - ] - } + if inner_choice: + return {"allOf": [mapped_type, {"oneOf": inner_choice}]} else: return mapped_type diff --git a/utils/translate/SimpleTypeMapping.py b/utils/translate/SimpleTypeMapping.py index 2e60f9ca..70edfb8c 100644 --- a/utils/translate/SimpleTypeMapping.py +++ b/utils/translate/SimpleTypeMapping.py @@ -7,12 +7,11 @@ from .TypeMapping import TypeMapping log = logging.getLogger() + class SimpleTypeMapping(TypeMapping): def map(self, xst: BaseXsdType): log.debug(f"Attempting mapping of {xst} to simple type") if not (type(xst) is XsdAtomicRestriction): log.debug("Type is not an XsdAtomicRestriction, giving up") return None - return { - "$ref" : xst.base_type.name - } \ No newline at end of file + return {"$ref": xst.base_type.name} diff --git a/utils/translate/TypeMapping.py b/utils/translate/TypeMapping.py index 2b4b785c..e646c892 100644 --- a/utils/translate/TypeMapping.py +++ b/utils/translate/TypeMapping.py @@ -8,63 +8,55 @@ from xmlschema.validators.facets import * log = logging.getLogger() + class TypeMapping(ABC): ns_to_id_map = {} XSD_NS = "http://www.w3.org/2001/XMLSchema" XSD_TYPE_MAP = { - "string" : { "type" : "string" }, - "normalizedString" : { "type" : "string"}, - "dateTime" : { "type" : "string"}, - "token" : { "type" : "string"}, - "anyURI" : { "type" : "string" }, - - "integer" : { "type" : "integer"}, - "nonNegativeInteger" : { "type" : "integer", "minimum" : 0}, - "positiveInteger" : { "type" : "integer", "minimum" : 1}, - - "boolean" : { "type" : "boolean" }, - - "hexBinary" : { "type" : "string", "pattern" : "^([a-fA-F0-9]{2})*$"}, - "base64Binary" : { "type" : "string", "pattern" : "^[A-Za-z0-9+\/]*={0,3}$"}, - - "anyType" : {} - } + "string": {"type": "string"}, + "normalizedString": {"type": "string"}, + "dateTime": {"type": "string"}, + "token": {"type": "string"}, + "anyURI": {"type": "string"}, + "integer": {"type": "integer"}, + "nonNegativeInteger": {"type": "integer", "minimum": 0}, + "positiveInteger": {"type": "integer", "minimum": 1}, + "boolean": {"type": "boolean"}, + "hexBinary": {"type": "string", "pattern": "^([a-fA-F0-9]{2})*$"}, + "base64Binary": { + "type": "string", + "pattern": "^[A-Za-z0-9+\/]*={0,3}$", + }, + "anyType": {}, + } @abstractmethod - def map(self, xst : BaseXsdType): + def map(self, xst: BaseXsdType): return None @classmethod def extract_namespace(cls, qname: str): - match = re.search(r'^\{([^\{\}]+)\}(([^\{\}]+))$', qname) + match = re.search(r"^\{([^\{\}]+)\}(([^\{\}]+))$", qname) if match is None: return None return match.group(1) @classmethod - def get_ref_for(cls, xsd_type: XsdType, current_ns : str): + def get_ref_for(cls, xsd_type: XsdType, current_ns: str): ns = cls.extract_namespace(xsd_type.name) if ns == current_ns: - return { "$ref" : f"#/$defs/{xsd_type.local_name}" } + return {"$ref": f"#/$defs/{xsd_type.local_name}"} else: mapped_id = cls.ns_to_id_map[ns] - return { "$ref" : f"{mapped_id['id']}#/$defs/{xsd_type.local_name}"} + return {"$ref": f"{mapped_id['id']}#/$defs/{xsd_type.local_name}"} @classmethod - def get_type_from_elem(cls, elem: XsdElement, current_ns : str): + def get_type_from_elem(cls, elem: XsdElement, current_ns: str): ns = cls.extract_namespace(elem.type.name) - if (ns == TypeMapping.XSD_NS): + if ns == TypeMapping.XSD_NS: # this should be an XSD primitive type return dict(TypeMapping.XSD_TYPE_MAP[elem.type.local_name]) else: return cls.get_ref_for(elem.type, current_ns) - - - - - - - - diff --git a/utils/translate/XSDNativeSimpleTypeMapping.py b/utils/translate/XSDNativeSimpleTypeMapping.py index 772ac10b..81810917 100644 --- a/utils/translate/XSDNativeSimpleTypeMapping.py +++ b/utils/translate/XSDNativeSimpleTypeMapping.py @@ -10,8 +10,8 @@ from .SimpleTypeMapping import SimpleTypeMapping log = logging.getLogger() -class XSDNativeSimpleTypeMapping(SimpleTypeMapping): +class XSDNativeSimpleTypeMapping(SimpleTypeMapping): def map(self, xst: BaseXsdType): log.debug(f"Attempting mapping of {xst} to XSD native type") j = super().map(xst) @@ -25,48 +25,60 @@ class XSDNativeSimpleTypeMapping(SimpleTypeMapping): if mapped_type is None: ns = TypeMapping.extract_namespace(xst.base_type.name) if ns == XSDNativeSimpleTypeMapping.XSD_NS: - print (xst) - print (xst.base_type) - raise Exception (f"No mapping for xs:{xst.base_type.local_name}") + print(xst) + print(xst.base_type) + raise Exception( + f"No mapping for xs:{xst.base_type.local_name}" + ) if len(xst.facets) == 0: - mapped_type = TypeMapping.get_ref_for(xst.base_type, xst.namespaces['']) + mapped_type = TypeMapping.get_ref_for( + xst.base_type, xst.namespaces[""] + ) else: - parent_type = TypeMapping.get_ref_for(xst.base_type, xst.namespaces['']) - mapped_type = TypeMapping.XSD_TYPE_MAP.get(xst.root_type.local_name) + parent_type = TypeMapping.get_ref_for( + xst.base_type, xst.namespaces[""] + ) + mapped_type = TypeMapping.XSD_TYPE_MAP.get( + xst.root_type.local_name + ) if mapped_type is None: - raise Exception (f"Could not find mapping for root type xs:{xst.root_type.local_name}") + raise Exception( + f"Could not find mapping for root type xs:{xst.root_type.local_name}" + ) mapped_type = dict(mapped_type) - + for k, v in xst.facets.items(): log.debug(f"Mapping facet {v}") if type(v) is XsdMaxLengthFacet: - mapped_type['maxLength'] = v.value + mapped_type["maxLength"] = v.value continue if type(v) is XsdMinLengthFacet: - mapped_type['minLength'] = v.value + mapped_type["minLength"] = v.value continue if type(v) is XsdPatternFacets: if len(v.regexps) > 1: - raise Exception (f"Multiple patterns given in facet {v} of {xst}") + raise Exception( + f"Multiple patterns given in facet {v} of {xst}" + ) p = v.regexps[0] - if (not p.startswith('^')) and (not p.endswith('$')): + if (not p.startswith("^")) and (not p.endswith("$")): p = f"^{p}$" - mapped_type['pattern'] = p + mapped_type["pattern"] = p continue - if type (v) is XsdMinInclusiveFacet: - mapped_type['minimum'] = v.value + if type(v) is XsdMinInclusiveFacet: + mapped_type["minimum"] = v.value continue - if type (v) is XsdMaxInclusiveFacet: - mapped_type['maximum'] = v.value + if type(v) is XsdMaxInclusiveFacet: + mapped_type["maximum"] = v.value continue - if type (v) is XsdMinExclusiveFacet: - mapped_type['exclusiveMinimum'] = v.value + if type(v) is XsdMinExclusiveFacet: + mapped_type["exclusiveMinimum"] = v.value continue - if type (v) is XsdMaxExclusiveFacet: - mapped_type['exclusiveMaximum'] = v.value + if type(v) is XsdMaxExclusiveFacet: + mapped_type["exclusiveMaximum"] = v.value continue - raise Exception (f"Unhandled facet {v}") + raise Exception(f"Unhandled facet {v}") if parent_type: - return { 'allOf' : [parent_type, mapped_type] } + return {"allOf": [parent_type, mapped_type]} return mapped_type diff --git a/utils/translate/__init__.py b/utils/translate/__init__.py index 86a33468..bfdeb093 100644 --- a/utils/translate/__init__.py +++ b/utils/translate/__init__.py @@ -17,30 +17,35 @@ mappings = [ SequenceMapping(), ] -def translate_schema (schema_path: str, ns_to_id_map: dict, schema_locations = []): + +def translate_schema( + schema_path: str, ns_to_id_map: dict, schema_locations=[] +): js = { - "$id" : "?", - "$schema" : "https://json-schema.org/draft/2020-12/schema", - "$defs" : {} + "$id": "?", + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": {}, } logging.info(f"Translating schema {schema_path}") - xs = XMLSchema(schema_path, validation='lax', locations=schema_locations) - logging.info(f"Schema namespace: {xs.target_namespace}" ) + xs = XMLSchema(schema_path, validation="lax", locations=schema_locations) + logging.info(f"Schema namespace: {xs.target_namespace}") schema_id = ns_to_id_map[xs.target_namespace]["id"] - js['$id'] = schema_id + js["$id"] = schema_id TypeMapping.ns_to_id_map = ns_to_id_map elementList = [] for elementName, element in xs.elements.items(): logging.info(f"Processing element {elementName} : {element}") - elementList.append(TypeMapping.get_ref_for(element.type, element.namespaces[''])) + elementList.append( + TypeMapping.get_ref_for(element.type, element.namespaces[""]) + ) if len(elementList) == 1: - js['$ref'] = elementList[0]['$ref'] + js["$ref"] = elementList[0]["$ref"] elif len(elementList) > 1: - js['oneOf'] = elementList + js["oneOf"] = elementList descendent_types = {} for type_name, xsd_type in xs.types.items(): @@ -57,9 +62,7 @@ def translate_schema (schema_path: str, ns_to_id_map: dict, schema_locations = [ if j is None: raise Exception(f"Unmapped type {type_name} ({xsd_type})") js["$defs"][xsd_type.local_name] = j - logging.debug (f"Mapped {type_name} to {j}") + logging.debug(f"Mapped {type_name} to {j}") - print (descendent_types) + print(descendent_types) return js - - diff --git a/utils/translate_spec.py b/utils/translate_spec.py index e950b4ae..10083d43 100644 --- a/utils/translate_spec.py +++ b/utils/translate_spec.py @@ -8,68 +8,78 @@ from xmlschema import * from translate import * -logging.basicConfig(level = logging.INFO) +logging.basicConfig(level=logging.INFO) json_signature_struct = { - "properties" : { - "protected" : { "type" : "string" }, - "signature" : { "type" : "string" } + "properties": { + "protected": {"type": "string"}, + "signature": {"type": "string"}, }, - "required" : ["protected", "signature" ] + "required": ["protected", "signature"], } -def build_schema_locations (paths): + +def build_schema_locations(paths): schema_locations = [] for schemaFile in paths: try: - xs = XMLSchema(schemaFile, validation='skip') - schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve()))) - logging.debug (" [ {0} -> {1} ]".format(xs.target_namespace, schemaFile)) + xs = XMLSchema(schemaFile, validation="skip") + schema_locations.append( + (xs.target_namespace, str(Path(schemaFile).resolve())) + ) + logging.debug( + " [ {0} -> {1} ]".format(xs.target_namespace, schemaFile) + ) except XMLSchemaParseError as ex: - logging.debug (" [ {0} failed to parse: {1} ]".format(schemaFile, ex)) + logging.debug( + " [ {0} failed to parse: {1} ]".format(schemaFile, ex) + ) return schema_locations + def get_json(filename): with open(filename) as f: j = json.load(f) return j -def convert_ns_to_id (ns): - if ns.startswith('http://uri.etsi.org'): + +def convert_ns_to_id(ns): + if ns.startswith("http://uri.etsi.org"): c = ns.split("/") return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}" else: - return ns.replace("http://","").replace("/","_") + return ns.replace("http://", "").replace("/", "_") + -def convert_xsd_to_filename (xsd): +def convert_xsd_to_filename(xsd): f = Path(xsd) - return f.name.replace('.xsd', '.schema.json') + return f.name.replace(".xsd", ".schema.json") + if __name__ == "__main__": if len(sys.argv) < 2: - logging.error ("Usage: translate_spec.py path_to_config_file") + logging.error("Usage: translate_spec.py path_to_config_file") exit(-1) config = get_json(sys.argv[1]) - logging.info("Bulding ns map...") ns_map = {} - for location, settings in config['schemas'].items(): - xs = XMLSchema(location, validation='skip') + for location, settings in config["schemas"].items(): + xs = XMLSchema(location, validation="skip") ns = xs.target_namespace id = convert_ns_to_id(ns) ns_map[ns] = { - "id" : id, - "location" : str(Path(location).resolve()) + "id": id, + "location": str(Path(location).resolve()), } | settings logging.debug(ns_map) - + logging.info("Building schema locations") - schema_locations = [(k, v["location"]) for k,v in ns_map.items()] + schema_locations = [(k, v["location"]) for k, v in ns_map.items()] logging.debug(schema_locations) - output_path = Path(config['output']) + output_path = Path(config["output"]) if not output_path.exists(): logging.info("Creating output directory") os.mkdir(str(output_path)) @@ -78,40 +88,57 @@ if __name__ == "__main__": json_schemas = {} for schema_tuple in schema_locations: logging.info(f" Translating {schema_tuple}") - if 'skip' in ns_map[schema_tuple[0]]: + if "skip" in ns_map[schema_tuple[0]]: logging.info(f" Skipping {schema_tuple[0]}...") continue js = translate_schema(schema_tuple[1], ns_map, schema_locations) # TODO - Special case, get rid of XML Dsig signature and insert JSON signature - if schema_tuple[0] == 'http://uri.etsi.org/03120/common/2019/10/Core': - logging.info ("Modifying signature elements") - js['$defs']['HI1Message']['properties'].pop('xmldsig:Signature') - js['$defs']['HI1Message']['properties']['Signature'] = json_signature_struct + if schema_tuple[0] == "http://uri.etsi.org/03120/common/2019/10/Core": + logging.info("Modifying signature elements") + js["$defs"]["HI1Message"]["properties"].pop("xmldsig:Signature") + js["$defs"]["HI1Message"]["properties"]["Signature"] = ( + json_signature_struct + ) js_path = output_path / convert_xsd_to_filename(schema_tuple[1]) # TODO - Special case - abstract HI1Object if "Core" in schema_tuple[1]: - js["$defs"]['ConcreteHI1Object'] = { - 'oneOf' : [ - {'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'}, - {'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'}, - {'$ref' : 'ts_103120_Task_2020_09#/$defs/LPTaskObject'}, - {'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'}, - {'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'}, - {'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'}, - {'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'}, - {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'}, - {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject'}, + js["$defs"]["ConcreteHI1Object"] = { + "oneOf": [ + { + "$ref": "ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject" + }, + {"$ref": "ts_103120_Task_2020_09#/$defs/LITaskObject"}, + {"$ref": "ts_103120_Task_2020_09#/$defs/LPTaskObject"}, + {"$ref": "ts_103120_Task_2020_09#/$defs/LDTaskObject"}, + { + "$ref": "ts_103120_Document_2020_09#/$defs/DocumentObject" + }, + { + "$ref": "ts_103120_Notification_2016_02#/$defs/NotificationObject" + }, + { + "$ref": "ts_103120_Delivery_2019_10#/$defs/DeliveryObject" + }, + { + "$ref": "ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject" + }, + { + "$ref": "ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject" + }, ] } json_string = json.dumps(js, indent=2) + "\n" if "Core" in schema_tuple[1]: - json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"') + json_string = json_string.replace( + '"$ref": "#/$defs/HI1Object"', + '"$ref": "#/$defs/ConcreteHI1Object"', + ) - with open(str(js_path), 'w', newline='\n') as f: + with open(str(js_path), "w", newline="\n") as f: f.write(json_string) - json_schemas[js['$id']] = json.loads(json_string) + json_schemas[js["$id"]] = json.loads(json_string) diff --git a/utils/verify_json.py b/utils/verify_json.py index 329c0692..5f740801 100644 --- a/utils/verify_json.py +++ b/utils/verify_json.py @@ -1,4 +1,3 @@ - import argparse import sys import logging @@ -11,8 +10,19 @@ import json if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) args = parser.parse_args() match args.verbose: @@ -27,28 +37,30 @@ if __name__ == "__main__": signed_json_text = args.input.read() args.input.close() - + j = json.loads(signed_json_text) - - protected_header = j['Signature']['protected'] - signature = j['Signature']['signature'] + + protected_header = j["Signature"]["protected"] + signature = j["Signature"]["signature"] # TODO some safety checks needed here # Remove the newline that appears from the console - if signed_json_text.endswith('\n'): signed_json_text = signed_json_text[:-1] - signed_json_text = signed_json_text.replace(protected_header, "").replace(signature, "") - - payload_bytes = signed_json_text.encode('utf-8') - payload_token = base64.b64encode(payload_bytes).decode('ascii') + if signed_json_text.endswith("\n"): + signed_json_text = signed_json_text[:-1] + signed_json_text = signed_json_text.replace(protected_header, "").replace( + signature, "" + ) + + payload_bytes = signed_json_text.encode("utf-8") + payload_token = base64.b64encode(payload_bytes).decode("ascii") # Un-pad the token, as per RFC7515 annex C - payload_token = payload_token.split('=')[0] - payload_token = payload_token.replace('+','-') - payload_token = payload_token.replace('/','_') + payload_token = payload_token.split("=")[0] + payload_token = payload_token.replace("+", "-") + payload_token = payload_token.replace("/", "_") token = protected_header + "." + payload_token + "." + signature - result = jws.verify(token, key="secret_key", algorithms=['HS256']) - + result = jws.verify(token, key="secret_key", algorithms=["HS256"]) + print("Signature verified") - diff --git a/utils/xml_to_json.py b/utils/xml_to_json.py index 86f8ad7e..79cc6978 100644 --- a/utils/xml_to_json.py +++ b/utils/xml_to_json.py @@ -9,96 +9,96 @@ import xmltodict import argparse -def extract_prefixes (d): - return { k.split(':')[1]: d[k] for k in d.keys() if k.startswith("@xmlns:") } +def extract_prefixes(d): + return {k.split(":")[1]: d[k] for k in d.keys() if k.startswith("@xmlns:")} -def removePrefixes (o, prefixes): - if not isinstance(o, dict): return + +def removePrefixes(o, prefixes): + if not isinstance(o, dict): + return replacements = [] - for k,v in o.items(): + for k, v in o.items(): if isinstance(v, dict): removePrefixes(v, prefixes) if isinstance(v, list): for i in v: removePrefixes(i, prefixes) if ":" in k: - prefix = k.split(':')[0] + prefix = k.split(":")[0] if (prefix) in prefixes: - new_key = k.split(':')[1] - replacements.append( (k, new_key) ) + new_key = k.split(":")[1] + replacements.append((k, new_key)) for r in replacements: o[r[1]] = o.pop(r[0]) + object_namespaces = { - 'AuthorisationObject' : 'http://uri.etsi.org/03120/common/2020/09/Authorisation', - 'DeliveryObject' : 'http://uri.etsi.org/03120/common/2019/10/Delivery', - 'DocumentObject' : 'http://uri.etsi.org/03120/common/2020/09/Document', - 'NotificationObject' : 'http://uri.etsi.org/03120/common/2016/02/Notification', - 'LITaskObject' : 'http://uri.etsi.org/03120/common/2020/09/Task', - 'LPTaskObject' : 'http://uri.etsi.org/03120/common/2020/09/Task', - 'LDTaskObject' : 'http://uri.etsi.org/03120/common/2020/09/Task', - 'TrafficPolicyObject' : 'http://uri.etsi.org/03120/common/2022/07/TrafficPolicy', - 'TrafficRuleObject' : 'http://uri.etsi.org/03120/common/2022/07/TrafficPolicy' + "AuthorisationObject": "http://uri.etsi.org/03120/common/2020/09/Authorisation", + "DeliveryObject": "http://uri.etsi.org/03120/common/2019/10/Delivery", + "DocumentObject": "http://uri.etsi.org/03120/common/2020/09/Document", + "NotificationObject": "http://uri.etsi.org/03120/common/2016/02/Notification", + "LITaskObject": "http://uri.etsi.org/03120/common/2020/09/Task", + "LPTaskObject": "http://uri.etsi.org/03120/common/2020/09/Task", + "LDTaskObject": "http://uri.etsi.org/03120/common/2020/09/Task", + "TrafficPolicyObject": "http://uri.etsi.org/03120/common/2022/07/TrafficPolicy", + "TrafficRuleObject": "http://uri.etsi.org/03120/common/2022/07/TrafficPolicy", } coerce_to_list = [ - 'auth:AuthorisationApprovalDetails', - 'auth:AuthorisationFlag', - 'auth:CSPID', - 'common:ApproverContactDetails', - 'ActionRequest', - 'ActionResponse', - 'ListResponseRecord', - 'AssociatedObject', - 'doc:DocumentSignature', - 'doc:DocumentProperty', - 'notification:AssociatedObjectStatus', - 'task:ApprovalDetails', - 'task:TargetIdentifierValue', - 'task:DeliveryDestination', - 'task:TaskFlag', - 'task:AlternativePreservationReference', - 'task:ApprovalDetails', - 'task:ObservedTimes', - 'task:RequestValue', - 'task:RequestSubtype', - 'task:LDDeliveryDestination', - 'task:LDTaskFlag', - 'task:TrafficPolicyReference', - 'tp:TrafficRuleReference', - 'tp:Criteria', - 'common:DictionaryEntry', - 'dictionaries:Dictionary', - 'config:TargetFormatTypeDefinitionEntry', - 'config:SupportedLIWorkflowEndpoint', - 'config:SupportedLPWorkflowEndpoint', + "auth:AuthorisationApprovalDetails", + "auth:AuthorisationFlag", + "auth:CSPID", + "common:ApproverContactDetails", + "ActionRequest", + "ActionResponse", + "ListResponseRecord", + "AssociatedObject", + "doc:DocumentSignature", + "doc:DocumentProperty", + "notification:AssociatedObjectStatus", + "task:ApprovalDetails", + "task:TargetIdentifierValue", + "task:DeliveryDestination", + "task:TaskFlag", + "task:AlternativePreservationReference", + "task:ApprovalDetails", + "task:ObservedTimes", + "task:RequestValue", + "task:RequestSubtype", + "task:LDDeliveryDestination", + "task:LDTaskFlag", + "task:TrafficPolicyReference", + "tp:TrafficRuleReference", + "tp:Criteria", + "common:DictionaryEntry", + "dictionaries:Dictionary", + "config:TargetFormatTypeDefinitionEntry", + "config:SupportedLIWorkflowEndpoint", + "config:SupportedLPWorkflowEndpoint", ] coerce_to_int = [ - 'ActionIdentifier', - 'delivery:SequenceNumber', - 'task:Order', - 'ErrorCode', - 'Generation', + "ActionIdentifier", + "delivery:SequenceNumber", + "task:Order", + "ErrorCode", + "Generation", ] -coerce_to_bool = [ - 'delivery:LastSequence' -] +coerce_to_bool = ["delivery:LastSequence"] -coerce_to_empty = [ - 'GETCSPCONFIG' -] +coerce_to_empty = ["GETCSPCONFIG"] coerce_null_to_empty = [ - 'SupportedLIWorkflowEndpoints', - 'SupportedLPWorkflowEndpoints', - 'config:AssociatedLDRequestSubtypes', - 'config:AssociatedLPRequestSubtypes', - 'config:AssociatedLIRequestSubtypes', + "SupportedLIWorkflowEndpoints", + "SupportedLPWorkflowEndpoints", + "config:AssociatedLDRequestSubtypes", + "config:AssociatedLPRequestSubtypes", + "config:AssociatedLIRequestSubtypes", ] -def postprocessor (path, key, value): + +def postprocessor(path, key, value): if key == "@xsi:type": object_name = value.split(":")[-1] if object_name in object_namespaces.keys(): @@ -115,10 +115,22 @@ def postprocessor (path, key, value): return key, {} return key, value + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Verbose logging (can be specified multiple times)", + ) + parser.add_argument( + "-i", + "--input", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to input file (if absent, stdin is used)", + ) args = parser.parse_args() match args.verbose: @@ -136,9 +148,8 @@ if __name__ == "__main__": logging.debug(s) - d = xmltodict.parse(s, - force_list=tuple(coerce_to_list), - postprocessor=postprocessor - )['HI1Message'] + d = xmltodict.parse( + s, force_list=tuple(coerce_to_list), postprocessor=postprocessor + )["HI1Message"] print(json.dumps(d, indent=2)) -- GitLab From f7ca2928374ea37cc5d5862fb8b74f0352ff9ab1 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:05:33 +0000 Subject: [PATCH 2/9] Check JSON formatting --- .gitlab-ci.yml | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 82eb0f9c..b207c66b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -19,11 +19,24 @@ preflight: script: - forgelib-preflight https://$CI_SERVER_HOST $CI_PROJECT_ID $CI_MERGE_REQUEST_IID -check_py: - image: "forge.etsi.org:5050/li/schemas-definitions/forgelib" +#check_py: +# image: "forge.etsi.org:5050/li/schemas-definitions/forgelib" +# stage: check +# script: +# - ruff format --check . + +check_json_format: + image: "forge.etsi.org:5050/li/schemas-definitions/json" stage: check script: - - ruff format --check . + - | + find . -type f -name "*.json" -print0 | while IFS= read -r -d '' file; do + if ! diff -q <(jq . "$file") "$file" >/dev/null 2>&1; then + echo "❌ Unformatted JSON: $file" + INVALID_JSON_FILES=true + fi + done + - if [ "$INVALID_JSON_FILES" = true ]; then echo "Some JSON files are not formatted. Please run jq to fix them."; process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" -- GitLab From ad9be9b55e18db56c01042ecec63213bfc4e8cb7 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:06:45 +0000 Subject: [PATCH 3/9] Clean check --- .gitlab-ci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b207c66b..9b6bba92 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -33,10 +33,8 @@ check_json_format: find . -type f -name "*.json" -print0 | while IFS= read -r -d '' file; do if ! diff -q <(jq . "$file") "$file" >/dev/null 2>&1; then echo "❌ Unformatted JSON: $file" - INVALID_JSON_FILES=true fi done - - if [ "$INVALID_JSON_FILES" = true ]; then echo "Some JSON files are not formatted. Please run jq to fix them."; process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" -- GitLab From 21eca6133c1217c579608c40ad7b5c25a5b9a362 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:08:42 +0000 Subject: [PATCH 4/9] Run check --- .gitlab-ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 9b6bba92..b207c66b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -33,8 +33,10 @@ check_json_format: find . -type f -name "*.json" -print0 | while IFS= read -r -d '' file; do if ! diff -q <(jq . "$file") "$file" >/dev/null 2>&1; then echo "❌ Unformatted JSON: $file" + INVALID_JSON_FILES=true fi done + - if [ "$INVALID_JSON_FILES" = true ]; then echo "Some JSON files are not formatted. Please run jq to fix them."; process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" -- GitLab From 07d928a426819d5f651b5803739f5f2fa302fccc Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:11:31 +0000 Subject: [PATCH 5/9] Make bash happy --- .gitlab-ci.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b207c66b..17fe9ba5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -30,13 +30,17 @@ check_json_format: stage: check script: - | + INVALID_JSON_FILES=false find . -type f -name "*.json" -print0 | while IFS= read -r -d '' file; do if ! diff -q <(jq . "$file") "$file" >/dev/null 2>&1; then echo "❌ Unformatted JSON: $file" INVALID_JSON_FILES=true fi done - - if [ "$INVALID_JSON_FILES" = true ]; then echo "Some JSON files are not formatted. Please run jq to fix them."; + if $INVALID_JSON_FILES; then + echo "Some JSON files are not formatted. Please run jq to fix them." + exit 1 + fi process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" -- GitLab From bf0f29dd66e5979789619f3418552f9560511db5 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:15:52 +0000 Subject: [PATCH 6/9] Correct if statement --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 17fe9ba5..5a081bd5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -37,7 +37,7 @@ check_json_format: INVALID_JSON_FILES=true fi done - if $INVALID_JSON_FILES; then + if [ "$INVALID_JSON_FILES" = true ]; then echo "Some JSON files are not formatted. Please run jq to fix them." exit 1 fi -- GitLab From dc17a0364fa184eeef436e3b8c845c6ffd26a5f7 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:21:23 +0000 Subject: [PATCH 7/9] Fix check? --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5a081bd5..4400e85e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -37,7 +37,7 @@ check_json_format: INVALID_JSON_FILES=true fi done - if [ "$INVALID_JSON_FILES" = true ]; then + if [ "$INVALID_JSON_FILES" == "true" ]; then echo "Some JSON files are not formatted. Please run jq to fix them." exit 1 fi -- GitLab From a1a22cc97ccab632e172f7315e0a1e7ea1fd72dc Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:24:21 +0000 Subject: [PATCH 8/9] New approach --- .gitlab-ci.yml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4400e85e..31cadce7 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -30,17 +30,14 @@ check_json_format: stage: check script: - | - INVALID_JSON_FILES=false + EXIT_CODE=0 find . -type f -name "*.json" -print0 | while IFS= read -r -d '' file; do if ! diff -q <(jq . "$file") "$file" >/dev/null 2>&1; then echo "❌ Unformatted JSON: $file" - INVALID_JSON_FILES=true + EXIT_CODE=1 fi done - if [ "$INVALID_JSON_FILES" == "true" ]; then - echo "Some JSON files are not formatted. Please run jq to fix them." - exit 1 - fi + exit $EXIT_CODE process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" -- GitLab From 95f0621c8900409a92a578706a9c26bfcb29ea77 Mon Sep 17 00:00:00 2001 From: vanschelts Date: Thu, 27 Feb 2025 14:32:33 +0000 Subject: [PATCH 9/9] Better check --- .gitlab-ci.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 31cadce7..3a87e7de 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -30,14 +30,15 @@ check_json_format: stage: check script: - | - EXIT_CODE=0 find . -type f -name "*.json" -print0 | while IFS= read -r -d '' file; do if ! diff -q <(jq . "$file") "$file" >/dev/null 2>&1; then echo "❌ Unformatted JSON: $file" - EXIT_CODE=1 + touch /tmp/format_json fi done - exit $EXIT_CODE + if [ -s /tmp/format_json ]; then + exit 1 + fi process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" -- GitLab