Skip to content
json_validator.py 7.84 KiB
Newer Older
canterburym's avatar
canterburym committed
import sys
from jsonschema import validate, RefResolver, Draft202012Validator
from jsonschema.exceptions import ValidationError
import json
from pathlib import Path
import logging
import argparse
from itertools import chain

class JsonValidator:
    def __init__(self, core_schema: str, other_schemas : dict):
        self._core_schema = json.load(Path(core_schema).open())
        self._schema_dict = { self._core_schema['$id'] : self._core_schema }
        self._supporting_paths = []
        for thing in other_schemas:
            path = Path(thing)
            if path.is_dir():
                logging.debug(f"Searching {path} for schema files")
                self._supporting_paths.extend(path.rglob("*.schema.json"))
            else:
                logging.debug(f"Appending {path} as schema file")
                self._supporting_paths.append(path)
        logging.info(f"Supporting schema paths: {self._supporting_paths}")
        self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths]
        self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas }
        logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}")
        self._resolver = RefResolver(None, 
                        referrer=None, 
                        store=self._schema_dict)
        logging.info("Created RefResolver")
        self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver)
        logging.info("Created validator")

    def validate(self, instance_doc: str):
        errors = list(self._validator.iter_errors(instance_doc))
        return errors
    
class TS103120Validator (JsonValidator):
    def __init__ (self, path_to_repo):
        repo_path = Path(path_to_repo)
        schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")]
        core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json")
        JsonValidator.__init__(self, core_schema, schema_dirs)
        request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }  
        self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver)
        response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }  
        self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver)
    
    def expand_request_response_exception (self, ex):
        if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
            logging.info ("Error detected validating payload oneOf - attempting explicit validation...")
            if 'RequestPayload' in instance_doc['Payload'].keys():
                ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])]))
                for r in ret_list:
                    r.path = ex.path + r.path
                return ret_list
            elif 'ResponsePayload' in instance_doc['Payload'].keys():
                ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])]))
                for r in ret_list:
                    r.path = ex.path + r.path
                return ret_list
            else:
                logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
                return [ex]
        else:
            return [ex]
        
    def expand_action_exception (self, ex):
        logging.error("Error detected in ActionRequests/ActionResponses")
        error_path = list(ex.schema_path)
        if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
            logging.error("Error not in inner Request/Response allOf/oneOf constraint")
            return[ex]
        j = ex.instance
        j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
        verb = list(j.keys())[0]
        message = "Request" if error_path[1] == "ActionRequests" else "Response"
        v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver)
        ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])]))
        for r in ret_list:
            r.path = ex.path + r.path
        return ret_list
    
    def expand_object_exception (self, ex):
        logging.error("Error detected in verb")
        # The final level of validation is for the actual HI1Object validation
        if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']:
            logging.error("Error not inside HI1Object")
            return [ex]
        object_type = ex.instance['@xsi:type'].split('}')[-1]
        object_ref = {
            'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
            'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
            'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
            'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
            'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
            'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
            'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
            'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
            'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
        }[object_type]
        v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver)
        return list(v.iter_errors(ex.instance))
    
    def validate(self, instance_doc: str):
        errors = JsonValidator.validate(self, instance_doc)
        out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors]))
        return out_errors



if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
    parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
    parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
    parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document")
    parser.add_argument('--schema', default=None, help="Primary schema to validate against")    
    parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)")
    args = parser.parse_args()

    match args.verbose:
        case v if v and v >= 2:
            logging.basicConfig(level=logging.DEBUG)
        case 1:
            logging.basicConfig(level=logging.INFO)
        case _:
            logging.basicConfig(level=logging.WARNING)

    logging.debug(f"Arguments: {args}")

    if (args.ts103120):
        v = TS103120Validator("./")
    else:
        v = JsonValidator(args.schema, args.schemadir)

    logging.info(f"Taking instance doc input from {args.input.name}")
    instance_doc = json.loads(args.input.read())
    args.input.close()

    errors = v.validate(instance_doc)
    for error in errors:
        if args.printerror == 2:
            logging.error(error)
        elif args.printerror == 1:
            logging.error(f"{list(error.path)} - {error.message}")
    if len(errors) > 0:
        logging.error(f"{len(errors)} errors detected")
    else:
        logging.info(f"{len(errors)} errors detected")

    if len(errors) > 0:
        exit(-1)
    else:
        exit(0)