Newer
Older
import sys
from jsonschema import validate, RefResolver, Draft202012Validator
from jsonschema.exceptions import ValidationError
import json
from pathlib import Path
import logging
import argparse
def handle_uri(u):
print(u)
def load_json(path : str):
with open(path) as f:
return json.load(f)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
parser.add_argument('schema', help="Primary schema to validate against")
args = parser.parse_args()
match args.verbose:
case v if v and v >= 2:
logging.basicConfig(level=logging.DEBUG)
case 1:
logging.basicConfig(level=logging.INFO)
case _:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Arguments: {args}")
instance_doc = json.loads(args.input.read())
args.input.close()
main_schema = load_json(args.schema)
schema_dict = { main_schema['$id'] : main_schema }
if args.schemadir:
schema_paths = []
for d in args.schemadir:
logging.info(f"Searching {d}")
logging.info(list(Path(d).rglob("*.schema.json")))
schema_paths += [f for f in Path(d).rglob("*.schema.json")]
logging.info(f"Schema files loaded: {schema_paths}")
schemas_json = [json.load(p.open()) for p in schema_paths]
schema_dict = schema_dict | { s['$id'] : s for s in schemas_json }
logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}")
logging.debug (f"Instance doc: {instance_doc}")
logging.debug (f"Main schema: {main_schema}")
resolver = RefResolver(None,
referrer=None,
store=schema_dict)
v = Draft202012Validator(main_schema, resolver=resolver)
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
try:
v.validate(instance_doc)
except ValidationError as ex:
# Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element
# This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit
# validation against the relevant oneOf alternation to try and get a more useful validation error
if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
logging.error ("Error detected validating payload oneOf - attempting explicit validation...")
try:
if 'RequestPayload' in instance_doc['Payload'].keys():
request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }
v = Draft202012Validator(request_fragment_schema, resolver=resolver)
v.validate(instance_doc['Payload']['RequestPayload'])
elif 'ResponsePayload' in instance_doc['Payload'].keys():
request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }
v = Draft202012Validator(request_fragment_schema, resolver=resolver)
v.validate(instance_doc['Payload']['ResponsePayload'])
else:
logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
raise ex
except ValidationError as ex2:
# Similar to above, this is inner validation to try and get a more useful error in the event
# that something fails the verb oneOf constraint
logging.error("Error detected in ActionRequests/ActionResponses")
error_path = list(ex2.schema_path)
if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
logging.error("Error not in inner Request/Response allOf/oneOf constraint")
raise ex2
j = ex2.instance
j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
verb = list(j.keys())[0]
message = "Request" if error_path[1] == "ActionRequests" else "Response"
v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=resolver)
try:
v.validate(j[verb])
except ValidationError as ex3:
logging.error("Error detected in verb")
# The final level of validation is for the actual HI1Object validation
if list(ex3.schema_path) != ['properties', 'HI1Object', 'oneOf']:
logging.error("Error not inside HI1Object")
raise ex3
object_type = ex3.instance['@xsi:type'].split('}')[-1]
object_ref = {
'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
}[object_type]
v = Draft202012Validator({"$ref" : object_ref}, resolver=resolver)
v.validate(ex3.instance)
exit(-1)