import sys from jsonschema import validate, RefResolver, Draft202012Validator from jsonschema.exceptions import ValidationError import json from pathlib import Path import logging import argparse from itertools import chain class JsonValidator: def __init__(self, core_schema: str, other_schemas : dict): self._core_schema = json.load(Path(core_schema).open()) self._schema_dict = { self._core_schema['$id'] : self._core_schema } self._supporting_paths = [] for thing in other_schemas: path = Path(thing) if path.is_dir(): logging.debug(f"Searching {path} for schema files") self._supporting_paths.extend(path.rglob("*.schema.json")) else: logging.debug(f"Appending {path} as schema file") self._supporting_paths.append(path) logging.info(f"Supporting schema paths: {self._supporting_paths}") self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths] self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas } logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}") self._resolver = RefResolver(None, referrer=None, store=self._schema_dict) logging.info("Created RefResolver") self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver) logging.info("Created validator") def validate(self, instance_doc: str): errors = list(self._validator.iter_errors(instance_doc)) return errors class TS103120Validator (JsonValidator): def __init__ (self, path_to_repo): repo_path = Path(path_to_repo) schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")] core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json") JsonValidator.__init__(self, core_schema, schema_dirs) request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver) response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver) def expand_request_response_exception (self, ex): if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: logging.info ("Error detected validating payload oneOf - attempting explicit validation...") if 'RequestPayload' in instance_doc['Payload'].keys(): ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])])) for r in ret_list: r.path = ex.path + r.path return ret_list elif 'ResponsePayload' in instance_doc['Payload'].keys(): ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])])) for r in ret_list: r.path = ex.path + r.path return ret_list else: logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") return [ex] else: return [ex] def expand_action_exception (self, ex): logging.error("Error detected in ActionRequests/ActionResponses") error_path = list(ex.schema_path) if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: logging.error("Error not in inner Request/Response allOf/oneOf constraint") return[ex] j = ex.instance j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb verb = list(j.keys())[0] message = "Request" if error_path[1] == "ActionRequests" else "Response" v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver) ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])])) for r in ret_list: r.path = ex.path + r.path return ret_list def expand_object_exception (self, ex): logging.error("Error detected in verb") # The final level of validation is for the actual HI1Object validation if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']: logging.error("Error not inside HI1Object") return [ex] object_type = ex.instance['@xsi:type'].split('}')[-1] object_ref = { 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', }[object_type] v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver) return list(v.iter_errors(ex.instance)) def validate(self, instance_doc: str): errors = JsonValidator.validate(self, instance_doc) out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors])) return out_errors if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document") parser.add_argument('--schema', default=None, help="Primary schema to validate against") parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)") args = parser.parse_args() match args.verbose: case v if v and v >= 2: logging.basicConfig(level=logging.DEBUG) case 1: logging.basicConfig(level=logging.INFO) case _: logging.basicConfig(level=logging.WARNING) logging.debug(f"Arguments: {args}") if (args.ts103120): v = TS103120Validator("./") else: v = JsonValidator(args.schema, args.schemadir) logging.info(f"Taking instance doc input from {args.input.name}") instance_doc = json.loads(args.input.read()) args.input.close() errors = v.validate(instance_doc) for error in errors: if args.printerror == 2: logging.error(error) elif args.printerror == 1: logging.error(f"{list(error.path)} - {error.message}") if len(errors) > 0: logging.error(f"{len(errors)} errors detected") else: logging.info(f"{len(errors)} errors detected") if len(errors) > 0: exit(-1) else: exit(0)