Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import sys
from jsonschema import validate, RefResolver, Draft202012Validator
from jsonschema.exceptions import ValidationError
import json
from pathlib import Path
import logging
import argparse
from itertools import chain
class JsonValidator:
def __init__(self, core_schema: str, other_schemas : dict):
self._core_schema = json.load(Path(core_schema).open())
self._schema_dict = { self._core_schema['$id'] : self._core_schema }
self._supporting_paths = []
for thing in other_schemas:
path = Path(thing)
if path.is_dir():
logging.debug(f"Searching {path} for schema files")
self._supporting_paths.extend(path.rglob("*.schema.json"))
else:
logging.debug(f"Appending {path} as schema file")
self._supporting_paths.append(path)
logging.info(f"Supporting schema paths: {self._supporting_paths}")
self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths]
self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas }
logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}")
self._resolver = RefResolver(None,
referrer=None,
store=self._schema_dict)
logging.info("Created RefResolver")
self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver)
logging.info("Created validator")
def validate(self, instance_doc: str):
errors = list(self._validator.iter_errors(instance_doc))
return errors
class TS103120Validator (JsonValidator):
def __init__ (self, path_to_repo):
repo_path = Path(path_to_repo)
schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")]
core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json")
JsonValidator.__init__(self, core_schema, schema_dirs)
request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }
self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver)
response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }
self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver)
def expand_request_response_exception (self, ex):
if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
logging.info ("Error detected validating payload oneOf - attempting explicit validation...")
if 'RequestPayload' in instance_doc['Payload'].keys():
ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])]))
for r in ret_list:
r.path = ex.path + r.path
return ret_list
elif 'ResponsePayload' in instance_doc['Payload'].keys():
ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])]))
for r in ret_list:
r.path = ex.path + r.path
return ret_list
else:
logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
return [ex]
else:
return [ex]
def expand_action_exception (self, ex):
logging.error("Error detected in ActionRequests/ActionResponses")
error_path = list(ex.schema_path)
if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
logging.error("Error not in inner Request/Response allOf/oneOf constraint")
return[ex]
j = ex.instance
j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
verb = list(j.keys())[0]
message = "Request" if error_path[1] == "ActionRequests" else "Response"
v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver)
ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])]))
for r in ret_list:
r.path = ex.path + r.path
return ret_list
def expand_object_exception (self, ex):
logging.error("Error detected in verb")
# The final level of validation is for the actual HI1Object validation
if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']:
logging.error("Error not inside HI1Object")
return [ex]
object_type = ex.instance['@xsi:type'].split('}')[-1]
object_ref = {
'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
}[object_type]
v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver)
return list(v.iter_errors(ex.instance))
def validate(self, instance_doc: str):
errors = JsonValidator.validate(self, instance_doc)
out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors]))
return out_errors
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document")
parser.add_argument('--schema', default=None, help="Primary schema to validate against")
parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)")
args = parser.parse_args()
match args.verbose:
case v if v and v >= 2:
logging.basicConfig(level=logging.DEBUG)
case 1:
logging.basicConfig(level=logging.INFO)
case _:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Arguments: {args}")
if (args.ts103120):
v = TS103120Validator("./")
else:
v = JsonValidator(args.schema, args.schemadir)
logging.info(f"Taking instance doc input from {args.input.name}")
instance_doc = json.loads(args.input.read())
args.input.close()
errors = v.validate(instance_doc)
for error in errors:
if args.printerror == 2:
logging.error(error)
elif args.printerror == 1:
logging.error(f"{list(error.path)} - {error.message}")
if len(errors) > 0:
logging.error(f"{len(errors)} errors detected")
else:
logging.info(f"{len(errors)} errors detected")
if len(errors) > 0:
exit(-1)
else:
exit(0)