diff --git a/103120/examples/json/request1_signed.json b/103120/examples/json/request1_signed.json new file mode 100644 index 0000000000000000000000000000000000000000..4339ce3a7226385566db891b287a5ba631ae24f7 --- /dev/null +++ b/103120/examples/json/request1_signed.json @@ -0,0 +1,100 @@ +{ + "@xmlns": "http://uri.etsi.org/03120/common/2019/10/Core", + "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "@xmlns:common": "http://uri.etsi.org/03120/common/2016/02/Common", + "@xmlns:task": "http://uri.etsi.org/03120/common/2020/09/Task", + "@xmlns:auth": "http://uri.etsi.org/03120/common/2020/09/Authorisation", + "Header": { + "SenderIdentifier": { + "CountryCode": "XX", + "UniqueIdentifier": "ACTOR01" + }, + "ReceiverIdentifier": { + "CountryCode": "XX", + "UniqueIdentifier": "ACTOR02" + }, + "TransactionIdentifier": "c02358b2-76cf-4ba4-a8eb-f6436ccaea2e", + "Timestamp": "2015-09-01T12:00:00.000000Z", + "Version": { + "ETSIVersion": "V1.13.1", + "NationalProfileOwner": "XX", + "NationalProfileVersion": "v1.0" + } + }, + "Payload": { + "RequestPayload": { + "ActionRequests": { + "ActionRequest": [ + { + "ActionIdentifier": 0, + "CREATE": { + "HI1Object": { + "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Authorisation}AuthorisationObject", + "ObjectIdentifier": "7dbbc880-8750-4d3c-abe7-ea4a17646045", + "CountryCode": "XX", + "OwnerIdentifier": "ACTOR01", + "auth:AuthorisationReference": "W000001", + "auth:AuthorisationTimespan": { + "auth:StartTime": "2015-09-01T12:00:00Z", + "auth:EndTime": "2015-12-01T12:00:00Z" + } + } + } + }, + { + "ActionIdentifier": 1, + "CREATE": { + "HI1Object": { + "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Task}LITaskObject", + "ObjectIdentifier": "2b36a78b-b628-416d-bd22-404e68a0cd36", + "CountryCode": "XX", + "OwnerIdentifier": "ACTOR01", + "AssociatedObjects": { + "AssociatedObject": [ + "7dbbc880-8750-4d3c-abe7-ea4a17646045" + ] + }, + "task:Reference": "LIID1", + "task:TargetIdentifier": { + "task:TargetIdentifierValues": { + "task:TargetIdentifierValue": [ + { + "task:FormatType": { + "task:FormatOwner": "ETSI", + "task:FormatName": "InternationalE164" + }, + "task:Value": "442079460223" + } + ] + } + }, + "task:DeliveryType": { + "common:Owner": "ETSI", + "common:Name": "TaskDeliveryType", + "common:Value": "IRIandCC" + }, + "task:DeliveryDetails": { + "task:DeliveryDestination": [ + { + "task:DeliveryAddress": { + "task:IPv4Address": "192.0.2.0" + } + } + ] + }, + "task:CSPID": { + "CountryCode": "XX", + "UniqueIdentifier": "RECVER01" + } + } + } + } + ] + } + } + }, + "Signature": { + "protected": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", + "signature": "RImkRSJkh46537Bh4LpNbkL2O64jInUv0JLGeoKJ-2M" + } +} diff --git a/103120/schema/json/ts_103120_Core.schema.json b/103120/schema/json/ts_103120_Core.schema.json index 8225cf3e1ca13734e06b6e19ad6f21116b30e757..63e2ad484c3ae7f709be33f17c0a23ec4a2872c8 100644 --- a/103120/schema/json/ts_103120_Core.schema.json +++ b/103120/schema/json/ts_103120_Core.schema.json @@ -13,9 +13,19 @@ "Payload": { "$ref": "#/$defs/MessagePayload" }, - "Signature": {}, - "xmldsig:Signature": { - "$ref": "www.w3.org_2000_09_xmldsig##/$defs/SignatureType" + "Signature": { + "properties": { + "protected": { + "type": "string" + }, + "signature": { + "type": "string" + } + }, + "required": [ + "protected", + "signature" + ] } }, "required": [ diff --git a/presigned.json b/presigned.json new file mode 100644 index 0000000000000000000000000000000000000000..16329e04161fec2058830d3f2eacc83d6454d1b7 --- /dev/null +++ b/presigned.json @@ -0,0 +1,100 @@ +{ + "@xmlns": "http://uri.etsi.org/03120/common/2019/10/Core", + "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", + "@xmlns:common": "http://uri.etsi.org/03120/common/2016/02/Common", + "@xmlns:task": "http://uri.etsi.org/03120/common/2020/09/Task", + "@xmlns:auth": "http://uri.etsi.org/03120/common/2020/09/Authorisation", + "Header": { + "SenderIdentifier": { + "CountryCode": "XX", + "UniqueIdentifier": "ACTOR01" + }, + "ReceiverIdentifier": { + "CountryCode": "XX", + "UniqueIdentifier": "ACTOR02" + }, + "TransactionIdentifier": "c02358b2-76cf-4ba4-a8eb-f6436ccaea2e", + "Timestamp": "2015-09-01T12:00:00.000000Z", + "Version": { + "ETSIVersion": "V1.13.1", + "NationalProfileOwner": "XX", + "NationalProfileVersion": "v1.0" + } + }, + "Payload": { + "RequestPayload": { + "ActionRequests": { + "ActionRequest": [ + { + "ActionIdentifier": 0, + "CREATE": { + "HI1Object": { + "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Authorisation}AuthorisationObject", + "ObjectIdentifier": "7dbbc880-8750-4d3c-abe7-ea4a17646045", + "CountryCode": "XX", + "OwnerIdentifier": "ACTOR01", + "auth:AuthorisationReference": "W000001", + "auth:AuthorisationTimespan": { + "auth:StartTime": "2015-09-01T12:00:00Z", + "auth:EndTime": "2015-12-01T12:00:00Z" + } + } + } + }, + { + "ActionIdentifier": 1, + "CREATE": { + "HI1Object": { + "@xsi:type": "{http://uri.etsi.org/03120/common/2020/09/Task}LITaskObject", + "ObjectIdentifier": "2b36a78b-b628-416d-bd22-404e68a0cd36", + "CountryCode": "XX", + "OwnerIdentifier": "ACTOR01", + "AssociatedObjects": { + "AssociatedObject": [ + "7dbbc880-8750-4d3c-abe7-ea4a17646045" + ] + }, + "task:Reference": "LIID1", + "task:TargetIdentifier": { + "task:TargetIdentifierValues": { + "task:TargetIdentifierValue": [ + { + "task:FormatType": { + "task:FormatOwner": "ETSI", + "task:FormatName": "InternationalE164" + }, + "task:Value": "442079460223" + } + ] + } + }, + "task:DeliveryType": { + "common:Owner": "ETSI", + "common:Name": "TaskDeliveryType", + "common:Value": "IRIandCC" + }, + "task:DeliveryDetails": { + "task:DeliveryDestination": [ + { + "task:DeliveryAddress": { + "task:IPv4Address": "192.0.2.0" + } + } + ] + }, + "task:CSPID": { + "CountryCode": "XX", + "UniqueIdentifier": "RECVER01" + } + } + } + } + ] + } + } + }, + "Signature": { + "protected": "", + "signature": "" + } +} \ No newline at end of file diff --git a/utils/json_validator.py b/utils/json_validator.py new file mode 100644 index 0000000000000000000000000000000000000000..fb4619384b416e0156f517c14a0b0c4f0a6817cb --- /dev/null +++ b/utils/json_validator.py @@ -0,0 +1,156 @@ +import sys +from jsonschema import validate, RefResolver, Draft202012Validator +from jsonschema.exceptions import ValidationError +import json +from pathlib import Path +import logging +import argparse +from itertools import chain + +class JsonValidator: + def __init__(self, core_schema: str, other_schemas : dict): + self._core_schema = json.load(Path(core_schema).open()) + self._schema_dict = { self._core_schema['$id'] : self._core_schema } + self._supporting_paths = [] + for thing in other_schemas: + path = Path(thing) + if path.is_dir(): + logging.debug(f"Searching {path} for schema files") + self._supporting_paths.extend(path.rglob("*.schema.json")) + else: + logging.debug(f"Appending {path} as schema file") + self._supporting_paths.append(path) + logging.info(f"Supporting schema paths: {self._supporting_paths}") + self._supporting_schemas = [json.load(p.open()) for p in self._supporting_paths] + self._schema_dict = self._schema_dict | { s['$id'] : s for s in self._supporting_schemas } + logging.info(f"Loaded schema IDs: {[k for k in self._schema_dict.keys()]}") + self._resolver = RefResolver(None, + referrer=None, + store=self._schema_dict) + logging.info("Created RefResolver") + self._validator = Draft202012Validator(self._core_schema, resolver=self._resolver) + logging.info("Created validator") + + def validate(self, instance_doc: str): + errors = list(self._validator.iter_errors(instance_doc)) + return errors + +class TS103120Validator (JsonValidator): + def __init__ (self, path_to_repo): + repo_path = Path(path_to_repo) + schema_dirs = [str(repo_path / "103120/schema/json"), str("103280/")] + core_schema = str(repo_path / "103120/schema/json/ts_103120_Core.schema.json") + JsonValidator.__init__(self, core_schema, schema_dirs) + request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } + self._request_fragment_validator = Draft202012Validator(request_fragment_schema, resolver=self._resolver) + response_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } + self._response_fragment_validator = Draft202012Validator(response_fragment_schema, resolver=self._resolver) + + def expand_request_response_exception (self, ex): + if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: + logging.info ("Error detected validating payload oneOf - attempting explicit validation...") + if 'RequestPayload' in instance_doc['Payload'].keys(): + ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['RequestPayload'])])) + for r in ret_list: + r.path = ex.path + r.path + return ret_list + elif 'ResponsePayload' in instance_doc['Payload'].keys(): + ret_list = list(chain(*[self.expand_action_exception(x) for x in self._request_fragment_validator.iter_errors(instance_doc['Payload']['ResponsePayload'])])) + for r in ret_list: + r.path = ex.path + r.path + return ret_list + else: + logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") + return [ex] + else: + return [ex] + + def expand_action_exception (self, ex): + logging.error("Error detected in ActionRequests/ActionResponses") + error_path = list(ex.schema_path) + if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: + logging.error("Error not in inner Request/Response allOf/oneOf constraint") + return[ex] + j = ex.instance + j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb + verb = list(j.keys())[0] + message = "Request" if error_path[1] == "ActionRequests" else "Response" + v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=self._resolver) + ret_list = list(chain(*[self.expand_object_exception(x) for x in v.iter_errors(j[verb])])) + for r in ret_list: + r.path = ex.path + r.path + return ret_list + + def expand_object_exception (self, ex): + logging.error("Error detected in verb") + # The final level of validation is for the actual HI1Object validation + if list(ex.schema_path) != ['properties', 'HI1Object', 'oneOf']: + logging.error("Error not inside HI1Object") + return [ex] + object_type = ex.instance['@xsi:type'].split('}')[-1] + object_ref = { + 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', + 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', + 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', + 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', + 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', + 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', + 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', + 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', + 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', + }[object_type] + v = Draft202012Validator({"$ref" : object_ref}, resolver=self._resolver) + return list(v.iter_errors(ex.instance)) + + def validate(self, instance_doc: str): + errors = JsonValidator.validate(self, instance_doc) + out_errors = list(chain(*[self.expand_request_response_exception(ex) for ex in errors])) + return out_errors + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") + parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") + parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument('--ts103120', action="store_true", help="Validate a TS 103 120 JSON document") + parser.add_argument('--schema', default=None, help="Primary schema to validate against") + parser.add_argument('-p', '--printerror', action="count", help="Controls how verbose validation error printing is (can be specified multiple times)") + args = parser.parse_args() + + match args.verbose: + case v if v and v >= 2: + logging.basicConfig(level=logging.DEBUG) + case 1: + logging.basicConfig(level=logging.INFO) + case _: + logging.basicConfig(level=logging.WARNING) + + logging.debug(f"Arguments: {args}") + + if (args.ts103120): + v = TS103120Validator("./") + else: + v = JsonValidator(args.schema, args.schemadir) + + logging.info(f"Taking instance doc input from {args.input.name}") + instance_doc = json.loads(args.input.read()) + args.input.close() + + errors = v.validate(instance_doc) + for error in errors: + if args.printerror == 2: + logging.error(error) + elif args.printerror == 1: + logging.error(f"{list(error.path)} - {error.message}") + if len(errors) > 0: + logging.error(f"{len(errors)} errors detected") + else: + logging.info(f"{len(errors)} errors detected") + + if len(errors) > 0: + exit(-1) + else: + exit(0) diff --git a/utils/sign_json.py b/utils/sign_json.py new file mode 100644 index 0000000000000000000000000000000000000000..1ce0bba5b21d00ff86bcab7d680425451d0e24f8 --- /dev/null +++ b/utils/sign_json.py @@ -0,0 +1,57 @@ + +import argparse +import logging +import sys +from jose import jws +from pathlib import Path + +import json + + +def insert_sig_block (j): + j['Signature'] = { + 'protected' : '', + 'signature' : '' + } + return j + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') + parser.add_argument('--pretty', action="store_true", help='Pretty-print the JSON document before signing') + parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + args = parser.parse_args() + + match args.verbose: + case v if v and v >= 2: + logging.basicConfig(level=logging.DEBUG) + case 1: + logging.basicConfig(level=logging.INFO) + case _: + logging.basicConfig(level=logging.WARNING) + + logging.debug(f"Arguments: {args}") + + json_text = args.input.read() + args.input.close() + + j = json.loads(json_text) + j = insert_sig_block(j) + + indent = None + if args.pretty: + indent = ' ' + presigned_json_text = json.dumps(j, indent=indent) + Path('presigned.json').write_text(presigned_json_text) + presigned_json_bytes = presigned_json_text.encode('utf-8') + + signed = jws.sign(presigned_json_bytes, 'secret_key', algorithm="HS256") + components = signed.split('.') + + j['Signature']['protected'] = components[0] + j['Signature']['signature'] = components[2] + + signed_json_text = json.dumps(j, indent=indent) + print(signed_json_text) + + diff --git a/utils/translate_spec.py b/utils/translate_spec.py index eac24e496f3c5e2cf3c6640161db6563b3607044..ce7a05572deaabffdd85573c38b154fe31991b5c 100644 --- a/utils/translate_spec.py +++ b/utils/translate_spec.py @@ -10,6 +10,14 @@ from translate import * logging.basicConfig(level = logging.INFO) +json_signature_struct = { + "properties" : { + "protected" : { "type" : "string" }, + "signature" : { "type" : "string" } + }, + "required" : ["protected", "signature" ] +} + def build_schema_locations (paths): schema_locations = [] for schemaFile in paths: @@ -70,15 +78,17 @@ if __name__ == "__main__": json_schemas = {} for schema_tuple in schema_locations: logging.info(f" Translating {schema_tuple}") - if 'xmldsig' in (schema_tuple[1]): - # TODO - work out what to do here - logging.info(" Skipping XML Dsig...") + if 'skip' in ns_map[schema_tuple[0]]: + logging.info(f" Skipping {schema_tuple[0]}...") continue js = translate_schema(schema_tuple[1], ns_map, schema_locations) - # TODO - Special case, get rid of signature - if ns_map[schema_tuple[0]] == 'core.json': - js['$defs']['HI1Message']['properties'].pop('Signature') + # TODO - Special case, get rid of XML Dsig signature and insert JSON signature + if schema_tuple[0] == 'http://uri.etsi.org/03120/common/2019/10/Core': + logging.info ("Modifying signature elements") + js['$defs']['HI1Message']['properties'].pop('xmldsig:Signature') + js['$defs']['HI1Message']['properties']['Signature'] = json_signature_struct + js_path = output_path / convert_xsd_to_filename(schema_tuple[1]) # TODO - Special case - abstract HI1Object diff --git a/utils/validate_json.py b/utils/validate_json.py deleted file mode 100644 index 43f460b4eb6c5ad7191cf02ebf88417d19535c24..0000000000000000000000000000000000000000 --- a/utils/validate_json.py +++ /dev/null @@ -1,122 +0,0 @@ -import sys -from jsonschema import validate, RefResolver, Draft202012Validator -from jsonschema.exceptions import ValidationError -import json -from pathlib import Path -import logging -import argparse - - -def handle_uri(u): - print(u) - -def load_json(path : str): - with open(path) as f: - return json.load(f) - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - - parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") - parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") - parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") - parser.add_argument('schema', help="Primary schema to validate against") - - args = parser.parse_args() - - match args.verbose: - case v if v and v >= 2: - logging.basicConfig(level=logging.DEBUG) - case 1: - logging.basicConfig(level=logging.INFO) - case _: - logging.basicConfig(level=logging.WARNING) - - logging.debug(f"Arguments: {args}") - - instance_doc = json.loads(args.input.read()) - args.input.close() - main_schema = load_json(args.schema) - schema_dict = { main_schema['$id'] : main_schema } - - if args.schemadir: - schema_paths = [] - for d in args.schemadir: - logging.info(f"Searching {d}") - logging.info(list(Path(d).rglob("*.schema.json"))) - schema_paths += [f for f in Path(d).rglob("*.schema.json")] - logging.info(f"Schema files loaded: {schema_paths}") - - schemas_json = [json.load(p.open()) for p in schema_paths] - schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } - - logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") - - logging.debug (f"Instance doc: {instance_doc}") - logging.debug (f"Main schema: {main_schema}") - - resolver = RefResolver(None, - referrer=None, - store=schema_dict) - - v = Draft202012Validator(main_schema, resolver=resolver) - try: - v.validate(instance_doc) - except ValidationError as ex: - # Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element - # This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit - # validation against the relevant oneOf alternation to try and get a more useful validation error - if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: - logging.error ("Error detected validating payload oneOf - attempting explicit validation...") - try: - if 'RequestPayload' in instance_doc['Payload'].keys(): - request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } - v = Draft202012Validator(request_fragment_schema, resolver=resolver) - v.validate(instance_doc['Payload']['RequestPayload']) - elif 'ResponsePayload' in instance_doc['Payload'].keys(): - request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } - v = Draft202012Validator(request_fragment_schema, resolver=resolver) - v.validate(instance_doc['Payload']['ResponsePayload']) - else: - logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") - raise ex - except ValidationError as ex2: - # Similar to above, this is inner validation to try and get a more useful error in the event - # that something fails the verb oneOf constraint - logging.error("Error detected in ActionRequests/ActionResponses") - error_path = list(ex2.schema_path) - if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: - logging.error("Error not in inner Request/Response allOf/oneOf constraint") - raise ex2 - j = ex2.instance - j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb - verb = list(j.keys())[0] - message = "Request" if error_path[1] == "ActionRequests" else "Response" - v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=resolver) - try: - v.validate(j[verb]) - except ValidationError as ex3: - logging.error("Error detected in verb") - # The final level of validation is for the actual HI1Object validation - if list(ex3.schema_path) != ['properties', 'HI1Object', 'oneOf']: - logging.error("Error not inside HI1Object") - raise ex3 - object_type = ex3.instance['@xsi:type'].split('}')[-1] - object_ref = { - 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', - 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', - 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', - 'LPTaskObject': 'ts_103120_Task_2020_09#/$defs/LPTaskObject', - 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', - 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', - 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', - 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject', - 'TrafficRuleObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject', - }[object_type] - v = Draft202012Validator({"$ref" : object_ref}, resolver=resolver) - v.validate(ex3.instance) - - exit(-1) - - - logging.info("Done") diff --git a/utils/verify_json.py b/utils/verify_json.py new file mode 100644 index 0000000000000000000000000000000000000000..329c0692499ee7fd5bc4c36b81e1cebabc3d9d68 --- /dev/null +++ b/utils/verify_json.py @@ -0,0 +1,54 @@ + +import argparse +import sys +import logging +import base64 +from jose import jws +from pathlib import Path + +import json + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') + parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + args = parser.parse_args() + + match args.verbose: + case v if v and v >= 2: + logging.basicConfig(level=logging.DEBUG) + case 1: + logging.basicConfig(level=logging.INFO) + case _: + logging.basicConfig(level=logging.WARNING) + + logging.debug(f"Arguments: {args}") + + signed_json_text = args.input.read() + args.input.close() + + j = json.loads(signed_json_text) + + protected_header = j['Signature']['protected'] + signature = j['Signature']['signature'] + + # TODO some safety checks needed here + + # Remove the newline that appears from the console + if signed_json_text.endswith('\n'): signed_json_text = signed_json_text[:-1] + signed_json_text = signed_json_text.replace(protected_header, "").replace(signature, "") + + payload_bytes = signed_json_text.encode('utf-8') + payload_token = base64.b64encode(payload_bytes).decode('ascii') + + # Un-pad the token, as per RFC7515 annex C + payload_token = payload_token.split('=')[0] + payload_token = payload_token.replace('+','-') + payload_token = payload_token.replace('/','_') + + token = protected_header + "." + payload_token + "." + signature + result = jws.verify(token, key="secret_key", algorithms=['HS256']) + + print("Signature verified") +