Loading .gitlab-ci.yml +6 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,12 @@ preflight: script: - forgelib-preflight https://$CI_SERVER_HOST $CI_PROJECT_ID $CI_MERGE_REQUEST_IID check_py: image: "forge.etsi.org:5050/li/schemas-definitions/forgelib" stage: check script: - ruff format --check . process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" stage: check Loading 103705/validate.py +34 −18 Original line number Diff line number Diff line Loading @@ -6,7 +6,6 @@ import logging import argparse # filename = sys.argv[1] # def load_json (path): Loading Loading @@ -38,20 +37,39 @@ import argparse # validate(json_instance, ext_schema) # print ("OK") def handle_uri(u): print(u) def load_json(path: str): with open(path) as f: return json.load(f) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument('schema', help="Primary schema to validate against") parser.add_argument( "-s", "--schemadir", action="append", help="Directory containing supporting schema files to use for validation", ) parser.add_argument( "-v", "--verbose", action="count", help="Verbose logging (can be specified multiple times)", ) parser.add_argument( "-i", "--input", type=argparse.FileType("r"), default=sys.stdin, help="Path to input file (if absent, stdin is used)", ) parser.add_argument("schema", help="Primary schema to validate against") args = parser.parse_args() Loading @@ -68,7 +86,7 @@ if __name__ == "__main__": instance_doc = json.loads(args.input.read()) args.input.close() main_schema = load_json(args.schema) schema_dict = { main_schema['$id'] : main_schema } schema_dict = {main_schema["$id"]: main_schema} if args.schemadir: schema_paths = [] Loading @@ -77,16 +95,14 @@ if __name__ == "__main__": logging.info(f"Schema files loaded: {schema_paths}") schemas_json = [json.load(p.open()) for p in schema_paths] schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } schema_dict = schema_dict | {s["$id"]: s for s in schemas_json} logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") logging.debug(f"Instance doc: {instance_doc}") logging.debug(f"Main schema: {main_schema}") resolver = RefResolver(None, referrer=None, store=schema_dict) resolver = RefResolver(None, referrer=None, store=schema_dict) v = Draft202012Validator(main_schema, resolver=resolver) Loading 103705/validation/validate_705.py +58 −31 Original line number Diff line number Diff line Loading @@ -5,9 +5,11 @@ from pathlib import Path import logging import argparse def handle_uri(u): print(u) def load_json(path: str): with open(path) as f: return json.load(f) Loading @@ -16,9 +18,25 @@ def load_json(path : str): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument( "-s", "--schemadir", action="append", help="Directory containing supporting schema files to use for validation", ) parser.add_argument( "-v", "--verbose", action="count", help="Verbose logging (can be specified multiple times)", ) parser.add_argument( "-i", "--input", type=argparse.FileType("r"), default=sys.stdin, help="Path to input file (if absent, stdin is used)", ) args = parser.parse_args() Loading @@ -36,20 +54,22 @@ if __name__ == "__main__": args.input.close() config = { 'schema_include_dirs' : [ '../schema/', '../../103280/', "schema_include_dirs": [ "../schema/", "../../103280/", ], 'main_schema_doc' : '../schema/response.schema.json' "main_schema_doc": "../schema/response.schema.json", } rootPath = Path(sys.argv[0]).parent main_schema = load_json(str(rootPath / config['main_schema_doc'])) schema_dict = { main_schema['$id'] : main_schema } main_schema = load_json(str(rootPath / config["main_schema_doc"])) schema_dict = {main_schema["$id"]: main_schema} schema_paths = [] for d in config['schema_include_dirs']: schema_paths += [f for f in (rootPath / Path(d)).rglob("*.schema.json")] for d in config["schema_include_dirs"]: schema_paths += [ f for f in (rootPath / Path(d)).rglob("*.schema.json") ] logging.info(f"Core schema files loaded: {schema_paths}") if args.schemadir: for d in args.schemadir: Loading @@ -58,38 +78,45 @@ if __name__ == "__main__": else: logging.info(f"No CSP schema files loaded") schemas_json = [json.load(p.open()) for p in schema_paths] schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } schema_dict = schema_dict | {s["$id"]: s for s in schemas_json} logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") logging.debug(f"Instance doc: {instance_doc}") logging.debug(f"Main schema: {main_schema}") resolver = RefResolver(None, referrer=None, store=schema_dict) resolver = RefResolver(None, referrer=None, store=schema_dict) logging.info("Performing ETSI validation") v = Draft202012Validator(main_schema, resolver=resolver) v.validate(instance_doc) logging.info("Building record type dictionary") type_dict = instance_doc['recordSetDescription']['recordTypes'] type_dict = instance_doc["recordSetDescription"]["recordTypes"] logging.debug(type_dict) ref_dict = {k: {"$ref": v} for k, v in type_dict.items()} validator_dict = { k : Draft202012Validator(ref_dict[k], resolver=resolver) for k,v in ref_dict.items()} validator_dict = { k: Draft202012Validator(ref_dict[k], resolver=resolver) for k, v in ref_dict.items() } logging.debug(ref_dict) logging.info("Validating records") for r in instance_doc['recordSet']: type_key = r['type'] for r in instance_doc["recordSet"]: type_key = r["type"] if type_key not in type_dict.keys(): logging.error(f"Record {r['id']} has type {type_key}, not in recordType dict") logging.error( f"Record {r['id']} has type {type_key}, not in recordType dict" ) type_ref = type_dict[type_key] type_schema_id = type_ref.split('#')[0] logging.info(f"Using {type_schema_id} to validate {type_ref} in record {r['id']}") type_schema_id = type_ref.split("#")[0] logging.info( f"Using {type_schema_id} to validate {type_ref} in record {r['id']}" ) if not (type_key in validator_dict.keys()): logging.error(f'Type key {type_key} from type {type_ref} in record {r["id"]} not in validator dictionary') logging.error( f"Type key {type_key} from type {type_ref} in record {r['id']} not in validator dictionary" ) print(ref_dict) v = validator_dict[type_key] v.validate(r) Loading 103707/testing/validate_examples.py +13 −16 Original line number Diff line number Diff line Loading @@ -3,39 +3,36 @@ import sys from pathlib import Path from pprint import pprint if __name__ == '__main__': if __name__ == "__main__": if sys.version_info <= (3, 5): sys.exit('ERROR: You need at least Python 3.5 to run this tool') sys.exit("ERROR: You need at least Python 3.5 to run this tool") try: from lxml import etree except ImportError: sys.exit('ERROR: You need to install the Python lxml library') sys.exit("ERROR: You need to install the Python lxml library") try: import xmlschema except ImportError: sys.exit('ERROR: You need to install the xml schema library') sys.exit("ERROR: You need to install the xml schema library") extraSchemas = [ 'examples/FooServiceSchema.xsd', 'TS_103_280_v020301.xsd' ] extraSchemas = ["examples/FooServiceSchema.xsd", "TS_103_280_v020301.xsd"] locations = [] for schemaFile in extraSchemas: xs = xmlschema.XMLSchema(schemaFile, validation='skip') xs = xmlschema.XMLSchema(schemaFile, validation="skip") locations.append((xs.default_namespace, str(Path(schemaFile)))) coreSchema = xmlschema.XMLSchema('TS_103_707_v010201.xsd', locations=locations) coreSchema = xmlschema.XMLSchema( "TS_103_707_v010201.xsd", locations=locations ) for schema in extraSchemas: newSchema = xmlschema.XMLSchema(schema) coreSchema.import_schema(newSchema.default_namespace, schema) examples = glob.glob('examples/*.xml') examples = glob.glob("examples/*.xml") for example in examples: try: coreSchema.validate(example) Loading @@ -43,4 +40,4 @@ if __name__ == '__main__': except Exception as ex: print("{0} failed validation: {1}".format(example, ex)) print ('Done') No newline at end of file print("Done") create_attachments.py +3 −1 Original line number Diff line number Diff line Loading @@ -46,7 +46,9 @@ def recursively_zip_directory(directory: Path, zipname: str, recursion=0): elif f.is_dir(): zipname = f.with_suffix(".zip").name logging.info(f"{'':{recursion * 4}}Adding archive: {f}") recurse_buffer = recursively_zip_directory(f, zipname, recursion + 1) recurse_buffer = recursively_zip_directory( f, zipname, recursion + 1 ) zip.writestr(zipname, recurse_buffer.getvalue()) return buffer Loading Loading
.gitlab-ci.yml +6 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,12 @@ preflight: script: - forgelib-preflight https://$CI_SERVER_HOST $CI_PROJECT_ID $CI_MERGE_REQUEST_IID check_py: image: "forge.etsi.org:5050/li/schemas-definitions/forgelib" stage: check script: - ruff format --check . process_asn: image: "forge.etsi.org:5050/li/schemas-definitions/asn1test:latest" stage: check Loading
103705/validate.py +34 −18 Original line number Diff line number Diff line Loading @@ -6,7 +6,6 @@ import logging import argparse # filename = sys.argv[1] # def load_json (path): Loading Loading @@ -38,20 +37,39 @@ import argparse # validate(json_instance, ext_schema) # print ("OK") def handle_uri(u): print(u) def load_json(path: str): with open(path) as f: return json.load(f) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument('schema', help="Primary schema to validate against") parser.add_argument( "-s", "--schemadir", action="append", help="Directory containing supporting schema files to use for validation", ) parser.add_argument( "-v", "--verbose", action="count", help="Verbose logging (can be specified multiple times)", ) parser.add_argument( "-i", "--input", type=argparse.FileType("r"), default=sys.stdin, help="Path to input file (if absent, stdin is used)", ) parser.add_argument("schema", help="Primary schema to validate against") args = parser.parse_args() Loading @@ -68,7 +86,7 @@ if __name__ == "__main__": instance_doc = json.loads(args.input.read()) args.input.close() main_schema = load_json(args.schema) schema_dict = { main_schema['$id'] : main_schema } schema_dict = {main_schema["$id"]: main_schema} if args.schemadir: schema_paths = [] Loading @@ -77,16 +95,14 @@ if __name__ == "__main__": logging.info(f"Schema files loaded: {schema_paths}") schemas_json = [json.load(p.open()) for p in schema_paths] schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } schema_dict = schema_dict | {s["$id"]: s for s in schemas_json} logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") logging.debug(f"Instance doc: {instance_doc}") logging.debug(f"Main schema: {main_schema}") resolver = RefResolver(None, referrer=None, store=schema_dict) resolver = RefResolver(None, referrer=None, store=schema_dict) v = Draft202012Validator(main_schema, resolver=resolver) Loading
103705/validation/validate_705.py +58 −31 Original line number Diff line number Diff line Loading @@ -5,9 +5,11 @@ from pathlib import Path import logging import argparse def handle_uri(u): print(u) def load_json(path: str): with open(path) as f: return json.load(f) Loading @@ -16,9 +18,25 @@ def load_json(path : str): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument( "-s", "--schemadir", action="append", help="Directory containing supporting schema files to use for validation", ) parser.add_argument( "-v", "--verbose", action="count", help="Verbose logging (can be specified multiple times)", ) parser.add_argument( "-i", "--input", type=argparse.FileType("r"), default=sys.stdin, help="Path to input file (if absent, stdin is used)", ) args = parser.parse_args() Loading @@ -36,20 +54,22 @@ if __name__ == "__main__": args.input.close() config = { 'schema_include_dirs' : [ '../schema/', '../../103280/', "schema_include_dirs": [ "../schema/", "../../103280/", ], 'main_schema_doc' : '../schema/response.schema.json' "main_schema_doc": "../schema/response.schema.json", } rootPath = Path(sys.argv[0]).parent main_schema = load_json(str(rootPath / config['main_schema_doc'])) schema_dict = { main_schema['$id'] : main_schema } main_schema = load_json(str(rootPath / config["main_schema_doc"])) schema_dict = {main_schema["$id"]: main_schema} schema_paths = [] for d in config['schema_include_dirs']: schema_paths += [f for f in (rootPath / Path(d)).rglob("*.schema.json")] for d in config["schema_include_dirs"]: schema_paths += [ f for f in (rootPath / Path(d)).rglob("*.schema.json") ] logging.info(f"Core schema files loaded: {schema_paths}") if args.schemadir: for d in args.schemadir: Loading @@ -58,38 +78,45 @@ if __name__ == "__main__": else: logging.info(f"No CSP schema files loaded") schemas_json = [json.load(p.open()) for p in schema_paths] schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } schema_dict = schema_dict | {s["$id"]: s for s in schemas_json} logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") logging.debug(f"Instance doc: {instance_doc}") logging.debug(f"Main schema: {main_schema}") resolver = RefResolver(None, referrer=None, store=schema_dict) resolver = RefResolver(None, referrer=None, store=schema_dict) logging.info("Performing ETSI validation") v = Draft202012Validator(main_schema, resolver=resolver) v.validate(instance_doc) logging.info("Building record type dictionary") type_dict = instance_doc['recordSetDescription']['recordTypes'] type_dict = instance_doc["recordSetDescription"]["recordTypes"] logging.debug(type_dict) ref_dict = {k: {"$ref": v} for k, v in type_dict.items()} validator_dict = { k : Draft202012Validator(ref_dict[k], resolver=resolver) for k,v in ref_dict.items()} validator_dict = { k: Draft202012Validator(ref_dict[k], resolver=resolver) for k, v in ref_dict.items() } logging.debug(ref_dict) logging.info("Validating records") for r in instance_doc['recordSet']: type_key = r['type'] for r in instance_doc["recordSet"]: type_key = r["type"] if type_key not in type_dict.keys(): logging.error(f"Record {r['id']} has type {type_key}, not in recordType dict") logging.error( f"Record {r['id']} has type {type_key}, not in recordType dict" ) type_ref = type_dict[type_key] type_schema_id = type_ref.split('#')[0] logging.info(f"Using {type_schema_id} to validate {type_ref} in record {r['id']}") type_schema_id = type_ref.split("#")[0] logging.info( f"Using {type_schema_id} to validate {type_ref} in record {r['id']}" ) if not (type_key in validator_dict.keys()): logging.error(f'Type key {type_key} from type {type_ref} in record {r["id"]} not in validator dictionary') logging.error( f"Type key {type_key} from type {type_ref} in record {r['id']} not in validator dictionary" ) print(ref_dict) v = validator_dict[type_key] v.validate(r) Loading
103707/testing/validate_examples.py +13 −16 Original line number Diff line number Diff line Loading @@ -3,39 +3,36 @@ import sys from pathlib import Path from pprint import pprint if __name__ == '__main__': if __name__ == "__main__": if sys.version_info <= (3, 5): sys.exit('ERROR: You need at least Python 3.5 to run this tool') sys.exit("ERROR: You need at least Python 3.5 to run this tool") try: from lxml import etree except ImportError: sys.exit('ERROR: You need to install the Python lxml library') sys.exit("ERROR: You need to install the Python lxml library") try: import xmlschema except ImportError: sys.exit('ERROR: You need to install the xml schema library') sys.exit("ERROR: You need to install the xml schema library") extraSchemas = [ 'examples/FooServiceSchema.xsd', 'TS_103_280_v020301.xsd' ] extraSchemas = ["examples/FooServiceSchema.xsd", "TS_103_280_v020301.xsd"] locations = [] for schemaFile in extraSchemas: xs = xmlschema.XMLSchema(schemaFile, validation='skip') xs = xmlschema.XMLSchema(schemaFile, validation="skip") locations.append((xs.default_namespace, str(Path(schemaFile)))) coreSchema = xmlschema.XMLSchema('TS_103_707_v010201.xsd', locations=locations) coreSchema = xmlschema.XMLSchema( "TS_103_707_v010201.xsd", locations=locations ) for schema in extraSchemas: newSchema = xmlschema.XMLSchema(schema) coreSchema.import_schema(newSchema.default_namespace, schema) examples = glob.glob('examples/*.xml') examples = glob.glob("examples/*.xml") for example in examples: try: coreSchema.validate(example) Loading @@ -43,4 +40,4 @@ if __name__ == '__main__': except Exception as ex: print("{0} failed validation: {1}".format(example, ex)) print ('Done') No newline at end of file print("Done")
create_attachments.py +3 −1 Original line number Diff line number Diff line Loading @@ -46,7 +46,9 @@ def recursively_zip_directory(directory: Path, zipname: str, recursion=0): elif f.is_dir(): zipname = f.with_suffix(".zip").name logging.info(f"{'':{recursion * 4}}Adding archive: {f}") recurse_buffer = recursively_zip_directory(f, zipname, recursion + 1) recurse_buffer = recursively_zip_directory( f, zipname, recursion + 1 ) zip.writestr(zipname, recurse_buffer.getvalue()) return buffer Loading