Skip to content
validate_json.py 6.25 KiB
Newer Older
canterburym's avatar
canterburym committed
import sys
from jsonschema import validate, RefResolver, Draft202012Validator
from jsonschema.exceptions import ValidationError
canterburym's avatar
canterburym committed
import json
from pathlib import Path
import logging
import argparse


def handle_uri(u):
    print(u)

def load_json(path : str):
    with open(path) as f:
        return json.load(f)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
    parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
    parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
canterburym's avatar
canterburym committed
    parser.add_argument('schema', help="Primary schema to validate against")    

    args = parser.parse_args()

    match args.verbose:
        case v if v and v >= 2:
            logging.basicConfig(level=logging.DEBUG)
        case 1:
            logging.basicConfig(level=logging.INFO)
        case _:
            logging.basicConfig(level=logging.WARNING)

    logging.debug(f"Arguments: {args}")

    instance_doc = json.loads(args.input.read())
    args.input.close()
canterburym's avatar
canterburym committed
    main_schema = load_json(args.schema)    
    schema_dict = { main_schema['$id'] : main_schema }

    if args.schemadir:
        schema_paths = []    
        for d in args.schemadir:
            logging.info(f"Searching {d}")
            logging.info(list(Path(d).rglob("*.schema.json")))
canterburym's avatar
canterburym committed
            schema_paths += [f for f in Path(d).rglob("*.schema.json")]
        logging.info(f"Schema files loaded: {schema_paths}")

        schemas_json = [json.load(p.open()) for p in schema_paths]
        schema_dict = schema_dict | { s['$id'] : s for s in schemas_json }

    logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}")

    logging.debug (f"Instance doc: {instance_doc}")
    logging.debug (f"Main schema: {main_schema}")

    resolver = RefResolver(None, 
                           referrer=None, 
                           store=schema_dict)
    
    v = Draft202012Validator(main_schema, resolver=resolver)
    try:
        v.validate(instance_doc)
    except ValidationError as ex:
        # Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element
        # This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit
        # validation against the relevant oneOf alternation to try and get a more useful validation error
        if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
            logging.error ("Error detected validating payload oneOf - attempting explicit validation...")
            try:
                if 'RequestPayload' in instance_doc['Payload'].keys():
                    request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }  
                    v = Draft202012Validator(request_fragment_schema, resolver=resolver)
                    v.validate(instance_doc['Payload']['RequestPayload'])
                elif 'ResponsePayload' in instance_doc['Payload'].keys():
                    request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }  
                    v = Draft202012Validator(request_fragment_schema, resolver=resolver)
                    v.validate(instance_doc['Payload']['ResponsePayload'])
                else:
                    logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
                    raise ex
            except ValidationError as ex2:
                # Similar to above, this is inner validation to try and get a more useful error in the event
                # that something fails the verb oneOf constraint
                logging.error("Error detected in ActionRequests/ActionResponses")
                error_path = list(ex2.schema_path)
                if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
                    logging.error("Error not in inner Request/Response allOf/oneOf constraint")
                    raise ex2
                j = ex2.instance
                j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
                verb = list(j.keys())[0]
                message = "Request" if error_path[1] == "ActionRequests" else "Response"
                v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=resolver)
                try:
                    v.validate(j[verb])
                except ValidationError as ex3:
                    logging.error("Error detected in verb")
                    # The final level of validation is for the actual HI1Object validation
                    if list(ex3.schema_path) != ['properties', 'HI1Object', 'oneOf']:
                        logging.error("Error not inside HI1Object")
                        raise ex3
                    object_type = ex3.instance['@xsi:type'].split('}')[-1]
                    object_ref = {
                        'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
                        'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
                        'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
canterburym's avatar
canterburym committed
                        'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
                        'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
                        'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
                        'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
canterburym's avatar
canterburym committed
                        'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
                        'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
                    }[object_type]
                    v = Draft202012Validator({"$ref" : object_ref}, resolver=resolver)
                    v.validate(ex3.instance)

        exit(-1)
canterburym's avatar
canterburym committed
    logging.info("Done")