Skip to content
...@@ -51,5 +51,19 @@ ...@@ -51,5 +51,19 @@
"exampleFiles" : [ "exampleFiles" : [
"103120/examples/xml" "103120/examples/xml"
] ]
} },
{
"coreSchema" : "103707/TS_103_707.xsd",
"supportingSchemas" : [
"103280/TS_103_280.xsd",
"103120/schema/xsd/ts_103120_Common.xsd",
"103120/schema/xsd/ts_103120_Core.xsd",
"103120/schema/xsd/ts_103120_Task.xsd",
"testing/deps/xmldsig/xmldsig-core-schema.xsd",
"103707/examples/FooServiceSchema.xsd"
],
"exampleFiles" : [
"103707/examples"
]
}
] ]
import sys
from jsonschema import validate, RefResolver, Draft202012Validator
from jsonschema.exceptions import ValidationError
import json
from pathlib import Path
import logging
import argparse
from itertools import chain
class JsonValidator:
def __init__(self, core_schema: str, other_schemas : dict):
self._core_schema = json.load(Path(core_schema).open())
self._schema_dict = { self._core_schema['$id'] : self._core_schema }
self._supporting_paths = []
for thing in other_schemas:
path = Path(thing)
if path.is_dir():
logging.debug(f"Searching {path} for schema files")
self._supporting_paths.extend(path.rglob("*.schema.json"))
else:
logging.debug(f"Appending {path} as schema file")
self._supporting_paths.append(path)
logging.info(f"Supporting schema paths: {self._supporting_paths}")
self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths]
self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas }
logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}")
self._resolver = RefResolver(None,
referrer=None,
store=self._schema_dict)
logging.info("Created RefResolver")
self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver)
logging.info("Created validator")
def validate(self, instance_doc: str):
errors = list(self._validator.iter_errors(instance_doc))
return errors
class TS103120Validator (JsonValidator):
def __init__ (self, path_to_repo):
repo_path = Path(path_to_repo)
schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")]
core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json")
JsonValidator.__init__(self, core_schema, schema_dirs)
request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }
self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver)
response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }
self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver)
def expand_request_response_exception (self, ex):
if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
logging.info ("Error detected validating payload oneOf - attempting explicit validation...")
if 'RequestPayload' in instance_doc['Payload'].keys():
ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])]))
for r in ret_list:
r.path = ex.path + r.path
return ret_list
elif 'ResponsePayload' in instance_doc['Payload'].keys():
ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])]))
for r in ret_list:
r.path = ex.path + r.path
return ret_list
else:
logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
return [ex]
else:
return [ex]
def expand_action_exception (self, ex):
logging.error("Error detected in ActionRequests/ActionResponses")
error_path = list(ex.schema_path)
if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
logging.error("Error not in inner Request/Response allOf/oneOf constraint")
return[ex]
j = ex.instance
j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
verb = list(j.keys())[0]
message = "Request" if error_path[1] == "ActionRequests" else "Response"
v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver)
ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])]))
for r in ret_list:
r.path = ex.path + r.path
return ret_list
def expand_object_exception (self, ex):
logging.error("Error detected in verb")
# The final level of validation is for the actual HI1Object validation
if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']:
logging.error("Error not inside HI1Object")
return [ex]
object_type = ex.instance['@xsi:type'].split('}')[-1]
object_ref = {
'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
}[object_type]
v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver)
return list(v.iter_errors(ex.instance))
def validate(self, instance_doc: str):
errors = JsonValidator.validate(self, instance_doc)
out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors]))
return out_errors
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document")
parser.add_argument('--schema', default=None, help="Primary schema to validate against")
parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)")
args = parser.parse_args()
match args.verbose:
case v if v and v >= 2:
logging.basicConfig(level=logging.DEBUG)
case 1:
logging.basicConfig(level=logging.INFO)
case _:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Arguments: {args}")
if (args.ts103120):
v = TS103120Validator("./")
else:
v = JsonValidator(args.schema, args.schemadir)
logging.info(f"Taking instance doc input from {args.input.name}")
instance_doc = json.loads(args.input.read())
args.input.close()
errors = v.validate(instance_doc)
for error in errors:
if args.printerror == 2:
logging.error(error)
elif args.printerror == 1:
logging.error(f"{list(error.path)} - {error.message}")
if len(errors) > 0:
logging.error(f"{len(errors)} errors detected")
else:
logging.info(f"{len(errors)} errors detected")
if len(errors) > 0:
exit(-1)
else:
exit(0)
import argparse
import logging
import sys
from jose import jws
from pathlib import Path
import json
def insert_sig_block (j):
j['Signature'] = {
'protected' : '',
'signature' : ''
}
return j
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)')
parser.add_argument('--pretty', action="store_true", help='Pretty-print the JSON document before signing')
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
args = parser.parse_args()
match args.verbose:
case v if v and v >= 2:
logging.basicConfig(level=logging.DEBUG)
case 1:
logging.basicConfig(level=logging.INFO)
case _:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Arguments: {args}")
json_text = args.input.read()
args.input.close()
j = json.loads(json_text)
j = insert_sig_block(j)
indent = None
if args.pretty:
indent = ' '
presigned_json_text = json.dumps(j, indent=indent)
Path('presigned.json').write_text(presigned_json_text)
presigned_json_bytes = presigned_json_text.encode('utf-8')
signed = jws.sign(presigned_json_bytes, 'secret_key', algorithm="HS256")
components = signed.split('.')
j['Signature']['protected'] = components[0]
j['Signature']['signature'] = components[2]
signed_json_text = json.dumps(j, indent=indent)
print(signed_json_text)
...@@ -10,6 +10,14 @@ from translate import * ...@@ -10,6 +10,14 @@ from translate import *
logging.basicConfig(level = logging.INFO) logging.basicConfig(level = logging.INFO)
json_signature_struct = {
"properties" : {
"protected" : { "type" : "string" },
"signature" : { "type" : "string" }
},
"required" : ["protected", "signature" ]
}
def build_schema_locations (paths): def build_schema_locations (paths):
schema_locations = [] schema_locations = []
for schemaFile in paths: for schemaFile in paths:
...@@ -70,15 +78,17 @@ if __name__ == "__main__": ...@@ -70,15 +78,17 @@ if __name__ == "__main__":
json_schemas = {} json_schemas = {}
for schema_tuple in schema_locations: for schema_tuple in schema_locations:
logging.info(f" Translating {schema_tuple}") logging.info(f" Translating {schema_tuple}")
if 'xmldsig' in (schema_tuple[1]): if 'skip' in ns_map[schema_tuple[0]]:
# TODO - work out what to do here logging.info(f" Skipping {schema_tuple[0]}...")
logging.info(" Skipping XML Dsig...")
continue continue
js = translate_schema(schema_tuple[1], ns_map, schema_locations) js = translate_schema(schema_tuple[1], ns_map, schema_locations)
# TODO - Special case, get rid of signature # TODO - Special case, get rid of XML Dsig signature and insert JSON signature
if ns_map[schema_tuple[0]] == 'core.json': if schema_tuple[0] == 'http://uri.etsi.org/03120/common/2019/10/Core':
js['$defs']['HI1Message']['properties'].pop('Signature') logging.info ("Modifying signature elements")
js['$defs']['HI1Message']['properties'].pop('xmldsig:Signature')
js['$defs']['HI1Message']['properties']['Signature'] = json_signature_struct
js_path = output_path / convert_xsd_to_filename(schema_tuple[1]) js_path = output_path / convert_xsd_to_filename(schema_tuple[1])
# TODO - Special case - abstract HI1Object # TODO - Special case - abstract HI1Object
......
{
"schemas" : {
"./103280/TS_103_280.xsd" : {
"prefix" : "etsi280"
}
},
"output" : "./103280/"
}
import sys
from jsonschema import validate, RefResolver, Draft202012Validator
from jsonschema.exceptions import ValidationError
import json
from pathlib import Path
import logging
import argparse
def handle_uri(u):
print(u)
def load_json(path : str):
with open(path) as f:
return json.load(f)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation")
parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)")
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
parser.add_argument('schema', help="Primary schema to validate against")
args = parser.parse_args()
match args.verbose:
case v if v and v >= 2:
logging.basicConfig(level=logging.DEBUG)
case 1:
logging.basicConfig(level=logging.INFO)
case _:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Arguments: {args}")
instance_doc = json.loads(args.input.read())
args.input.close()
main_schema = load_json(args.schema)
schema_dict = { main_schema['$id'] : main_schema }
if args.schemadir:
schema_paths = []
for d in args.schemadir:
logging.info(f"Searching {d}")
logging.info(list(Path(d).rglob("*.schema.json")))
schema_paths += [f for f in Path(d).rglob("*.schema.json")]
logging.info(f"Schema files loaded: {schema_paths}")
schemas_json = [json.load(p.open()) for p in schema_paths]
schema_dict = schema_dict | { s['$id'] : s for s in schemas_json }
logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}")
logging.debug (f"Instance doc: {instance_doc}")
logging.debug (f"Main schema: {main_schema}")
resolver = RefResolver(None,
referrer=None,
store=schema_dict)
v = Draft202012Validator(main_schema, resolver=resolver)
try:
v.validate(instance_doc)
except ValidationError as ex:
# Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element
# This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit
# validation against the relevant oneOf alternation to try and get a more useful validation error
if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']:
logging.error ("Error detected validating payload oneOf - attempting explicit validation...")
try:
if 'RequestPayload' in instance_doc['Payload'].keys():
request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" }
v = Draft202012Validator(request_fragment_schema, resolver=resolver)
v.validate(instance_doc['Payload']['RequestPayload'])
elif 'ResponsePayload' in instance_doc['Payload'].keys():
request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" }
v = Draft202012Validator(request_fragment_schema, resolver=resolver)
v.validate(instance_doc['Payload']['ResponsePayload'])
else:
logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?")
raise ex
except ValidationError as ex2:
# Similar to above, this is inner validation to try and get a more useful error in the event
# that something fails the verb oneOf constraint
logging.error("Error detected in ActionRequests/ActionResponses")
error_path = list(ex2.schema_path)
if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']:
logging.error("Error not in inner Request/Response allOf/oneOf constraint")
raise ex2
j = ex2.instance
j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb
verb = list(j.keys())[0]
message = "Request" if error_path[1] == "ActionRequests" else "Response"
v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=resolver)
try:
v.validate(j[verb])
except ValidationError as ex3:
logging.error("Error detected in verb")
# The final level of validation is for the actual HI1Object validation
if list(ex3.schema_path) != ['properties', 'HI1Object', 'oneOf']:
logging.error("Error not inside HI1Object")
raise ex3
object_type = ex3.instance['@xsi:type'].split('}')[-1]
object_ref = {
'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject',
'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject',
'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject',
'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject',
'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject',
'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject',
'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject',
'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject',
'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject',
}[object_type]
v = Draft202012Validator({"$ref" : object_ref}, resolver=resolver)
v.validate(ex3.instance)
exit(-1)
logging.info("Done")
import argparse
import sys
import logging
import base64
from jose import jws
from pathlib import Path
import json
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)')
parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)")
args = parser.parse_args()
match args.verbose:
case v if v and v >= 2:
logging.basicConfig(level=logging.DEBUG)
case 1:
logging.basicConfig(level=logging.INFO)
case _:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Arguments: {args}")
signed_json_text = args.input.read()
args.input.close()
j = json.loads(signed_json_text)
protected_header = j['Signature']['protected']
signature = j['Signature']['signature']
# TODO some safety checks needed here
# Remove the newline that appears from the console
if signed_json_text.endswith('\n'): signed_json_text = signed_json_text[:-1]
signed_json_text = signed_json_text.replace(protected_header, "").replace(signature, "")
payload_bytes = signed_json_text.encode('utf-8')
payload_token = base64.b64encode(payload_bytes).decode('ascii')
# Un-pad the token, as per RFC7515 annex C
payload_token = payload_token.split('=')[0]
payload_token = payload_token.replace('+','-')
payload_token = payload_token.replace('/','_')
token = protected_header + "." + payload_token + "." + signature
result = jws.verify(token, key="secret_key", algorithms=['HS256'])
print("Signature verified")