import sys from jsonschema import validate, RefResolver, Draft202012Validator from jsonschema.exceptions import ValidationError import json from pathlib import Path import logging import argparse def handle_uri(u): print(u) def load_json(path : str): with open(path) as f: return json.load(f) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument('schema', help="Primary schema to validate against") args = parser.parse_args() match args.verbose: case v if v and v >= 2: logging.basicConfig(level=logging.DEBUG) case 1: logging.basicConfig(level=logging.INFO) case _: logging.basicConfig(level=logging.WARNING) logging.debug(f"Arguments: {args}") instance_doc = json.loads(args.input.read()) args.input.close() main_schema = load_json(args.schema) schema_dict = { main_schema['$id'] : main_schema } if args.schemadir: schema_paths = [] for d in args.schemadir: logging.info(f"Searching {d}") logging.info(list(Path(d).rglob("*.schema.json"))) schema_paths += [f for f in Path(d).rglob("*.schema.json")] logging.info(f"Schema files loaded: {schema_paths}") schemas_json = [json.load(p.open()) for p in schema_paths] schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") logging.debug (f"Instance doc: {instance_doc}") logging.debug (f"Main schema: {main_schema}") resolver = RefResolver(None, referrer=None, store=schema_dict) v = Draft202012Validator(main_schema, resolver=resolver) try: v.validate(instance_doc) except ValidationError as ex: # Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element # This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit # validation against the relevant oneOf alternation to try and get a more useful validation error if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: logging.error ("Error detected validating payload oneOf - attempting explicit validation...") try: if 'RequestPayload' in instance_doc['Payload'].keys(): request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } v = Draft202012Validator(request_fragment_schema, resolver=resolver) v.validate(instance_doc['Payload']['RequestPayload']) elif 'ResponsePayload' in instance_doc['Payload'].keys(): request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } v = Draft202012Validator(request_fragment_schema, resolver=resolver) v.validate(instance_doc['Payload']['ResponsePayload']) else: logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") raise ex except ValidationError as ex2: # Similar to above, this is inner validation to try and get a more useful error in the event # that something fails the verb oneOf constraint logging.error("Error detected in ActionRequests/ActionResponses") error_path = list(ex2.schema_path) if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: logging.error("Error not in inner Request/Response allOf/oneOf constraint") raise ex2 j = ex2.instance j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb verb = list(j.keys())[0] message = "Request" if error_path[1] == "ActionRequests" else "Response" v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=resolver) try: v.validate(j[verb]) except ValidationError as ex3: logging.error("Error detected in verb") # The final level of validation is for the actual HI1Object validation if list(ex3.schema_path) != ['properties', 'HI1Object', 'oneOf']: logging.error("Error not inside HI1Object") raise ex3 object_type = ex3.instance['@xsi:type'].split('}')[-1] object_ref = { 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', }[object_type] v = Draft202012Validator({"$ref" : object_ref}, resolver=resolver) v.validate(ex3.instance) exit(-1) logging.info("Done")