Loading 103120/examples/json/request1_signed.json 0 → 100644 +100 −0 Original line number Diff line number Diff line { "@xmlns": "http://uri.etsi.org/03120/common/2019/10/Core", "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", "@xmlns:common": "http://uri.etsi.org/03120/common/2016/02/Common", "@xmlns:task": "http://uri.etsi.org/03120/common/2020/09/Task", "@xmlns:auth": "http://uri.etsi.org/03120/common/2020/09/Authorisation", "Header": { "SenderIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR01" }, "ReceiverIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR02" }, "TransactionIdentifier": "c02358b2-76cf-4ba4-a8eb-f6436ccaea2e", "Timestamp": "2015-09-01T12:00:00.000000Z", "Version": { "ETSIVersion": "V1.13.1", "NationalProfileOwner": "XX", "NationalProfileVersion": "v1.0" } }, "Payload": { "RequestPayload": { "ActionRequests": { "ActionRequest": [ { "ActionIdentifier": 0, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Authorisation}AuthorisationObject", "ObjectIdentifier": "7dbbc880-8750-4d3c-abe7-ea4a17646045", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "auth:AuthorisationReference": "W000001", "auth:AuthorisationTimespan": { "auth:StartTime": "2015-09-01T12:00:00Z", "auth:EndTime": "2015-12-01T12:00:00Z" } } } }, { "ActionIdentifier": 1, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Task}LITaskObject", "ObjectIdentifier": "2b36a78b-b628-416d-bd22-404e68a0cd36", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "AssociatedObjects": { "AssociatedObject": [ "7dbbc880-8750-4d3c-abe7-ea4a17646045" ] }, "task:Reference": "LIID1", "task:TargetIdentifier": { "task:TargetIdentifierValues": { "task:TargetIdentifierValue": [ { "task:FormatType": { "task:FormatOwner": "ETSI", "task:FormatName": "InternationalE164" }, "task:Value": "442079460223" } ] } }, "task:DeliveryType": { "common:Owner": "ETSI", "common:Name": "TaskDeliveryType", "common:Value": "IRIandCC" }, "task:DeliveryDetails": { "task:DeliveryDestination": [ { "task:DeliveryAddress": { "task:IPv4Address": "192.0.2.0" } } ] }, "task:CSPID": { "CountryCode": "XX", "UniqueIdentifier": "RECVER01" } } } } ] } } }, "Signature": { "protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", "signature": "RImkRSJkh46537Bh4LpNbkL2O64jInUv0JLGeoKJ-2M" } } 103120/schema/json/ts_103120_Core.schema.json +13 −3 Original line number Diff line number Diff line Loading @@ -13,9 +13,19 @@ "Payload": { "$ref": "#/$defs/MessagePayload" }, "Signature": {}, "xmldsig:Signature": { "$ref": "www.w3.org_2000_09_xmldsig##/$defs/SignatureType" "Signature": { "properties": { "protected": { "type": "string" }, "signature": { "type": "string" } }, "required": [ "protected", "signature" ] } }, "required": [ Loading presigned.json 0 → 100644 +100 −0 Original line number Diff line number Diff line { "@xmlns": "http://uri.etsi.org/03120/common/2019/10/Core", "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", "@xmlns:common": "http://uri.etsi.org/03120/common/2016/02/Common", "@xmlns:task": "http://uri.etsi.org/03120/common/2020/09/Task", "@xmlns:auth": "http://uri.etsi.org/03120/common/2020/09/Authorisation", "Header": { "SenderIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR01" }, "ReceiverIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR02" }, "TransactionIdentifier": "c02358b2-76cf-4ba4-a8eb-f6436ccaea2e", "Timestamp": "2015-09-01T12:00:00.000000Z", "Version": { "ETSIVersion": "V1.13.1", "NationalProfileOwner": "XX", "NationalProfileVersion": "v1.0" } }, "Payload": { "RequestPayload": { "ActionRequests": { "ActionRequest": [ { "ActionIdentifier": 0, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Authorisation}AuthorisationObject", "ObjectIdentifier": "7dbbc880-8750-4d3c-abe7-ea4a17646045", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "auth:AuthorisationReference": "W000001", "auth:AuthorisationTimespan": { "auth:StartTime": "2015-09-01T12:00:00Z", "auth:EndTime": "2015-12-01T12:00:00Z" } } } }, { "ActionIdentifier": 1, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Task}LITaskObject", "ObjectIdentifier": "2b36a78b-b628-416d-bd22-404e68a0cd36", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "AssociatedObjects": { "AssociatedObject": [ "7dbbc880-8750-4d3c-abe7-ea4a17646045" ] }, "task:Reference": "LIID1", "task:TargetIdentifier": { "task:TargetIdentifierValues": { "task:TargetIdentifierValue": [ { "task:FormatType": { "task:FormatOwner": "ETSI", "task:FormatName": "InternationalE164" }, "task:Value": "442079460223" } ] } }, "task:DeliveryType": { "common:Owner": "ETSI", "common:Name": "TaskDeliveryType", "common:Value": "IRIandCC" }, "task:DeliveryDetails": { "task:DeliveryDestination": [ { "task:DeliveryAddress": { "task:IPv4Address": "192.0.2.0" } } ] }, "task:CSPID": { "CountryCode": "XX", "UniqueIdentifier": "RECVER01" } } } } ] } } }, "Signature": { "protected": "", "signature": "" } } No newline at end of file utils/json_validator.py 0 → 100644 +156 −0 Original line number Diff line number Diff line import sys from jsonschema import validate, RefResolver, Draft202012Validator from jsonschema.exceptions import ValidationError import json from pathlib import Path import logging import argparse from itertools import chain class JsonValidator: def __init__(self, core_schema: str, other_schemas : dict): self._core_schema = json.load(Path(core_schema).open()) self._schema_dict = { self._core_schema['$id'] : self._core_schema } self._supporting_paths = [] for thing in other_schemas: path = Path(thing) if path.is_dir(): logging.debug(f"Searching {path} for schema files") self._supporting_paths.extend(path.rglob("*.schema.json")) else: logging.debug(f"Appending {path} as schema file") self._supporting_paths.append(path) logging.info(f"Supporting schema paths: {self._supporting_paths}") self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths] self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas } logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}") self._resolver = RefResolver(None, referrer=None, store=self._schema_dict) logging.info("Created RefResolver") self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver) logging.info("Created validator") def validate(self, instance_doc: str): errors = list(self._validator.iter_errors(instance_doc)) return errors class TS103120Validator (JsonValidator): def __init__ (self, path_to_repo): repo_path = Path(path_to_repo) schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")] core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json") JsonValidator.__init__(self, core_schema, schema_dirs) request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver) response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver) def expand_request_response_exception (self, ex): if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: logging.info ("Error detected validating payload oneOf - attempting explicit validation...") if 'RequestPayload' in instance_doc['Payload'].keys(): ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])])) for r in ret_list: r.path = ex.path + r.path return ret_list elif 'ResponsePayload' in instance_doc['Payload'].keys(): ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])])) for r in ret_list: r.path = ex.path + r.path return ret_list else: logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") return [ex] else: return [ex] def expand_action_exception (self, ex): logging.error("Error detected in ActionRequests/ActionResponses") error_path = list(ex.schema_path) if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: logging.error("Error not in inner Request/Response allOf/oneOf constraint") return[ex] j = ex.instance j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb verb = list(j.keys())[0] message = "Request" if error_path[1] == "ActionRequests" else "Response" v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver) ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])])) for r in ret_list: r.path = ex.path + r.path return ret_list def expand_object_exception (self, ex): logging.error("Error detected in verb") # The final level of validation is for the actual HI1Object validation if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']: logging.error("Error not inside HI1Object") return [ex] object_type = ex.instance['@xsi:type'].split('}')[-1] object_ref = { 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', }[object_type] v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver) return list(v.iter_errors(ex.instance)) def validate(self, instance_doc: str): errors = JsonValidator.validate(self, instance_doc) out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors])) return out_errors if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document") parser.add_argument('--schema', default=None, help="Primary schema to validate against") parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)") args = parser.parse_args() match args.verbose: case v if v and v >= 2: logging.basicConfig(level=logging.DEBUG) case 1: logging.basicConfig(level=logging.INFO) case _: logging.basicConfig(level=logging.WARNING) logging.debug(f"Arguments: {args}") if (args.ts103120): v = TS103120Validator("./") else: v = JsonValidator(args.schema, args.schemadir) logging.info(f"Taking instance doc input from {args.input.name}") instance_doc = json.loads(args.input.read()) args.input.close() errors = v.validate(instance_doc) for error in errors: if args.printerror == 2: logging.error(error) elif args.printerror == 1: logging.error(f"{list(error.path)} - {error.message}") if len(errors) > 0: logging.error(f"{len(errors)} errors detected") else: logging.info(f"{len(errors)} errors detected") if len(errors) > 0: exit(-1) else: exit(0) utils/sign_json.py 0 → 100644 +57 −0 Original line number Diff line number Diff line import argparse import logging import sys from jose import jws from pathlib import Path import json def insert_sig_block (j): j['Signature'] = { 'protected' : '', 'signature' : '' } return j if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') parser.add_argument('--pretty', action="store_true", help='Pretty-print the JSON document before signing') parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") args = parser.parse_args() match args.verbose: case v if v and v >= 2: logging.basicConfig(level=logging.DEBUG) case 1: logging.basicConfig(level=logging.INFO) case _: logging.basicConfig(level=logging.WARNING) logging.debug(f"Arguments: {args}") json_text = args.input.read() args.input.close() j = json.loads(json_text) j = insert_sig_block(j) indent = None if args.pretty: indent = ' ' presigned_json_text = json.dumps(j, indent=indent) Path('presigned.json').write_text(presigned_json_text) presigned_json_bytes = presigned_json_text.encode('utf-8') signed = jws.sign(presigned_json_bytes, 'secret_key', algorithm="HS256") components = signed.split('.') j['Signature']['protected'] = components[0] j['Signature']['signature'] = components[2] signed_json_text = json.dumps(j, indent=indent) print(signed_json_text) Loading
103120/examples/json/request1_signed.json 0 → 100644 +100 −0 Original line number Diff line number Diff line { "@xmlns": "http://uri.etsi.org/03120/common/2019/10/Core", "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", "@xmlns:common": "http://uri.etsi.org/03120/common/2016/02/Common", "@xmlns:task": "http://uri.etsi.org/03120/common/2020/09/Task", "@xmlns:auth": "http://uri.etsi.org/03120/common/2020/09/Authorisation", "Header": { "SenderIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR01" }, "ReceiverIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR02" }, "TransactionIdentifier": "c02358b2-76cf-4ba4-a8eb-f6436ccaea2e", "Timestamp": "2015-09-01T12:00:00.000000Z", "Version": { "ETSIVersion": "V1.13.1", "NationalProfileOwner": "XX", "NationalProfileVersion": "v1.0" } }, "Payload": { "RequestPayload": { "ActionRequests": { "ActionRequest": [ { "ActionIdentifier": 0, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Authorisation}AuthorisationObject", "ObjectIdentifier": "7dbbc880-8750-4d3c-abe7-ea4a17646045", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "auth:AuthorisationReference": "W000001", "auth:AuthorisationTimespan": { "auth:StartTime": "2015-09-01T12:00:00Z", "auth:EndTime": "2015-12-01T12:00:00Z" } } } }, { "ActionIdentifier": 1, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Task}LITaskObject", "ObjectIdentifier": "2b36a78b-b628-416d-bd22-404e68a0cd36", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "AssociatedObjects": { "AssociatedObject": [ "7dbbc880-8750-4d3c-abe7-ea4a17646045" ] }, "task:Reference": "LIID1", "task:TargetIdentifier": { "task:TargetIdentifierValues": { "task:TargetIdentifierValue": [ { "task:FormatType": { "task:FormatOwner": "ETSI", "task:FormatName": "InternationalE164" }, "task:Value": "442079460223" } ] } }, "task:DeliveryType": { "common:Owner": "ETSI", "common:Name": "TaskDeliveryType", "common:Value": "IRIandCC" }, "task:DeliveryDetails": { "task:DeliveryDestination": [ { "task:DeliveryAddress": { "task:IPv4Address": "192.0.2.0" } } ] }, "task:CSPID": { "CountryCode": "XX", "UniqueIdentifier": "RECVER01" } } } } ] } } }, "Signature": { "protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", "signature": "RImkRSJkh46537Bh4LpNbkL2O64jInUv0JLGeoKJ-2M" } }
103120/schema/json/ts_103120_Core.schema.json +13 −3 Original line number Diff line number Diff line Loading @@ -13,9 +13,19 @@ "Payload": { "$ref": "#/$defs/MessagePayload" }, "Signature": {}, "xmldsig:Signature": { "$ref": "www.w3.org_2000_09_xmldsig##/$defs/SignatureType" "Signature": { "properties": { "protected": { "type": "string" }, "signature": { "type": "string" } }, "required": [ "protected", "signature" ] } }, "required": [ Loading
presigned.json 0 → 100644 +100 −0 Original line number Diff line number Diff line { "@xmlns": "http://uri.etsi.org/03120/common/2019/10/Core", "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", "@xmlns:common": "http://uri.etsi.org/03120/common/2016/02/Common", "@xmlns:task": "http://uri.etsi.org/03120/common/2020/09/Task", "@xmlns:auth": "http://uri.etsi.org/03120/common/2020/09/Authorisation", "Header": { "SenderIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR01" }, "ReceiverIdentifier": { "CountryCode": "XX", "UniqueIdentifier": "ACTOR02" }, "TransactionIdentifier": "c02358b2-76cf-4ba4-a8eb-f6436ccaea2e", "Timestamp": "2015-09-01T12:00:00.000000Z", "Version": { "ETSIVersion": "V1.13.1", "NationalProfileOwner": "XX", "NationalProfileVersion": "v1.0" } }, "Payload": { "RequestPayload": { "ActionRequests": { "ActionRequest": [ { "ActionIdentifier": 0, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Authorisation}AuthorisationObject", "ObjectIdentifier": "7dbbc880-8750-4d3c-abe7-ea4a17646045", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "auth:AuthorisationReference": "W000001", "auth:AuthorisationTimespan": { "auth:StartTime": "2015-09-01T12:00:00Z", "auth:EndTime": "2015-12-01T12:00:00Z" } } } }, { "ActionIdentifier": 1, "CREATE": { "HI1Object": { "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Task}LITaskObject", "ObjectIdentifier": "2b36a78b-b628-416d-bd22-404e68a0cd36", "CountryCode": "XX", "OwnerIdentifier": "ACTOR01", "AssociatedObjects": { "AssociatedObject": [ "7dbbc880-8750-4d3c-abe7-ea4a17646045" ] }, "task:Reference": "LIID1", "task:TargetIdentifier": { "task:TargetIdentifierValues": { "task:TargetIdentifierValue": [ { "task:FormatType": { "task:FormatOwner": "ETSI", "task:FormatName": "InternationalE164" }, "task:Value": "442079460223" } ] } }, "task:DeliveryType": { "common:Owner": "ETSI", "common:Name": "TaskDeliveryType", "common:Value": "IRIandCC" }, "task:DeliveryDetails": { "task:DeliveryDestination": [ { "task:DeliveryAddress": { "task:IPv4Address": "192.0.2.0" } } ] }, "task:CSPID": { "CountryCode": "XX", "UniqueIdentifier": "RECVER01" } } } } ] } } }, "Signature": { "protected": "", "signature": "" } } No newline at end of file
utils/json_validator.py 0 → 100644 +156 −0 Original line number Diff line number Diff line import sys from jsonschema import validate, RefResolver, Draft202012Validator from jsonschema.exceptions import ValidationError import json from pathlib import Path import logging import argparse from itertools import chain class JsonValidator: def __init__(self, core_schema: str, other_schemas : dict): self._core_schema = json.load(Path(core_schema).open()) self._schema_dict = { self._core_schema['$id'] : self._core_schema } self._supporting_paths = [] for thing in other_schemas: path = Path(thing) if path.is_dir(): logging.debug(f"Searching {path} for schema files") self._supporting_paths.extend(path.rglob("*.schema.json")) else: logging.debug(f"Appending {path} as schema file") self._supporting_paths.append(path) logging.info(f"Supporting schema paths: {self._supporting_paths}") self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths] self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas } logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}") self._resolver = RefResolver(None, referrer=None, store=self._schema_dict) logging.info("Created RefResolver") self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver) logging.info("Created validator") def validate(self, instance_doc: str): errors = list(self._validator.iter_errors(instance_doc)) return errors class TS103120Validator (JsonValidator): def __init__ (self, path_to_repo): repo_path = Path(path_to_repo) schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")] core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json") JsonValidator.__init__(self, core_schema, schema_dirs) request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver) response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver) def expand_request_response_exception (self, ex): if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: logging.info ("Error detected validating payload oneOf - attempting explicit validation...") if 'RequestPayload' in instance_doc['Payload'].keys(): ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])])) for r in ret_list: r.path = ex.path + r.path return ret_list elif 'ResponsePayload' in instance_doc['Payload'].keys(): ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])])) for r in ret_list: r.path = ex.path + r.path return ret_list else: logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") return [ex] else: return [ex] def expand_action_exception (self, ex): logging.error("Error detected in ActionRequests/ActionResponses") error_path = list(ex.schema_path) if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: logging.error("Error not in inner Request/Response allOf/oneOf constraint") return[ex] j = ex.instance j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb verb = list(j.keys())[0] message = "Request" if error_path[1] == "ActionRequests" else "Response" v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver) ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])])) for r in ret_list: r.path = ex.path + r.path return ret_list def expand_object_exception (self, ex): logging.error("Error detected in verb") # The final level of validation is for the actual HI1Object validation if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']: logging.error("Error not inside HI1Object") return [ex] object_type = ex.instance['@xsi:type'].split('}')[-1] object_ref = { 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', }[object_type] v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver) return list(v.iter_errors(ex.instance)) def validate(self, instance_doc: str): errors = JsonValidator.validate(self, instance_doc) out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors])) return out_errors if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document") parser.add_argument('--schema', default=None, help="Primary schema to validate against") parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)") args = parser.parse_args() match args.verbose: case v if v and v >= 2: logging.basicConfig(level=logging.DEBUG) case 1: logging.basicConfig(level=logging.INFO) case _: logging.basicConfig(level=logging.WARNING) logging.debug(f"Arguments: {args}") if (args.ts103120): v = TS103120Validator("./") else: v = JsonValidator(args.schema, args.schemadir) logging.info(f"Taking instance doc input from {args.input.name}") instance_doc = json.loads(args.input.read()) args.input.close() errors = v.validate(instance_doc) for error in errors: if args.printerror == 2: logging.error(error) elif args.printerror == 1: logging.error(f"{list(error.path)} - {error.message}") if len(errors) > 0: logging.error(f"{len(errors)} errors detected") else: logging.info(f"{len(errors)} errors detected") if len(errors) > 0: exit(-1) else: exit(0)
utils/sign_json.py 0 → 100644 +57 −0 Original line number Diff line number Diff line import argparse import logging import sys from jose import jws from pathlib import Path import json def insert_sig_block (j): j['Signature'] = { 'protected' : '', 'signature' : '' } return j if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') parser.add_argument('--pretty', action="store_true", help='Pretty-print the JSON document before signing') parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") args = parser.parse_args() match args.verbose: case v if v and v >= 2: logging.basicConfig(level=logging.DEBUG) case 1: logging.basicConfig(level=logging.INFO) case _: logging.basicConfig(level=logging.WARNING) logging.debug(f"Arguments: {args}") json_text = args.input.read() args.input.close() j = json.loads(json_text) j = insert_sig_block(j) indent = None if args.pretty: indent = ' ' presigned_json_text = json.dumps(j, indent=indent) Path('presigned.json').write_text(presigned_json_text) presigned_json_bytes = presigned_json_text.encode('utf-8') signed = jws.sign(presigned_json_bytes, 'secret_key', algorithm="HS256") components = signed.split('.') j['Signature']['protected'] = components[0] j['Signature']['signature'] = components[2] signed_json_text = json.dumps(j, indent=indent) print(signed_json_text)