diff --git a/103280/TS_103_280.schema.json b/103280/TS_103_280.schema.json new file mode 100644 index 0000000000000000000000000000000000000000..02cd078e139e4896bad9cd8efcb2cfc7ae1a070a --- /dev/null +++ b/103280/TS_103_280.schema.json @@ -0,0 +1,389 @@ +{ + "$id": "ts_103280_2017_07", + "$defs": { + "ShortString": { + "type": "string", + "maxLength": 255 + }, + "LongString": { + "type": "string", + "maxLength": 65535 + }, + "LIID": { + "type": "string", + "pattern": "^([!-~]{1,25})|([0-9a-f]{26,50})$" + }, + "UTCDateTime": { + "type": "string", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + }, + "UTCMicrosecondDateTime": { + "type": "string", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{6}Z$" + }, + "QualifiedDateTime": { + "type": "string", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(Z|[+-][0-9]{2}:[0-9]{2})$" + }, + "QualifiedMicrosecondDateTime": { + "type": "string", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{6}(Z|[+-][0-9]{2}:[0-9]{2})$" + }, + "InternationalE164": { + "type": "string", + "pattern": "^[0-9]{1,15}$" + }, + "IMSI": { + "type": "string", + "pattern": "^[0-9]{6,15}$" + }, + "IMEI": { + "type": "string", + "pattern": "^[0-9]{14}$" + }, + "IMEICheckDigit": { + "type": "string", + "pattern": "^[0-9]{15}$" + }, + "IMEISV": { + "type": "string", + "pattern": "^[0-9]{16}$" + }, + "IPv4Address": { + "type": "string", + "pattern": "^((25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])$" + }, + "IPv4CIDR": { + "type": "string", + "pattern": "^((25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9]?[0-9])/([1-2]?[0-9]|3[0-2])$" + }, + "IPv6Address": { + "type": "string", + "pattern": "^([0-9a-f]{4}:){7}([0-9a-f]{4})$" + }, + "IPv6CIDR": { + "type": "string", + "pattern": "^([0-9a-f]{4}:){7}([0-9a-f]{4})/(([1-9][0-9]?)|(1[0-1][0-9])|(12[0-8]))$" + }, + "TCPPort": { + "type": "integer", + "exclusiveMinimum": 1, + "maximum": 65535 + }, + "UDPPort": { + "type": "integer", + "minimum": 0, + "maximum": 65535 + }, + "MACAddress": { + "type": "string", + "pattern": "^([a-f0-9]{2}:){5}[a-f0-9]{2}$" + }, + "EmailAddress": { + "allOf": [ + { + "$ref": "#/$defs/ShortString" + }, + { + "type": "string", + "pattern": "^[a-zA-Z0-9\\.!#$%&'\\*\\+\\\\/=\\?\\^_`\\{\\|\\}~\\-]+@[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$" + } + ] + }, + "UUID": { + "type": "string", + "pattern": "^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$" + }, + "ISOCountryCode": { + "type": "string", + "pattern": "^[A-Z]{2}$" + }, + "SIPURI": { + "type": "string", + "pattern": "^sips?:[a-zA-Z0-9!#$&-;=?-\\[\\]_~%]+$" + }, + "TELURI": { + "type": "string", + "pattern": "^tel:[a-zA-Z0-9!#$&-;=?-\\[\\]_~%]+$" + }, + "WGS84LatitudeDecimal": { + "type": "string", + "pattern": "^[NS][0-9]{2}\\.[0-9]{6}$" + }, + "WGS84LongitudeDecimal": { + "type": "string", + "pattern": "^[EW][0-9]{3}\\.[0-9]{6}$" + }, + "WGS84LatitudeAngular": { + "type": "string", + "pattern": "^[NS][0-9]{6}\\.[0-9]{2}$" + }, + "WGS84LongitudeAngular": { + "type": "string", + "pattern": "^[EW][0-9]{7}\\.[0-9]{2}$" + }, + "SUPIIMSI": { + "$ref": "#/$defs/IMSI" + }, + "SUPINAI": { + "$ref": "#/$defs/NAI" + }, + "SUCI": { + "type": "string", + "pattern": "^([a-fA-F0-9]{2})*$" + }, + "PEIIMEI": { + "$ref": "#/$defs/IMEI" + }, + "PEIIMEICheckDigit": { + "$ref": "#/$defs/IMEICheckDigit" + }, + "PEIIMEISV": { + "$ref": "#/$defs/IMEISV" + }, + "GPSIMSISDN": { + "type": "string", + "pattern": "^[0-9]{1,15}$" + }, + "GPSINAI": { + "$ref": "#/$defs/NAI" + }, + "NAI": { + "type": "string" + }, + "LDID": { + "type": "string", + "pattern": "^([A-Z]{2}-.+-.+)$" + }, + "InternationalizedEmailAddress": { + "allOf": [ + { + "$ref": "#/$defs/ShortString" + }, + { + "type": "string", + "pattern": "^.+@.+$" + } + ] + }, + "EUI64": { + "type": "string", + "pattern": "^([a-f0-9]{2}:){7}[a-f0-9]{2}$" + }, + "CGI": { + "type": "string", + "pattern": "^[0-9]{3}-[0-9]{2,3}-[a-f0-9]{4}-[a-f0-9]{4}$" + }, + "ECGI": { + "type": "string", + "pattern": "^[0-9]{3}-[0-9]{2,3}-[a-f0-9]{7}$" + }, + "NCGI": { + "type": "string", + "pattern": "^[0-9]{3}-[0-9]{2,3}-[a-f0-9]{9}$" + }, + "ICCID": { + "type": "string", + "pattern": "^[0-9]{19,20}$" + }, + "IPProtocol": { + "type": "integer", + "minimum": 0, + "maximum": 255 + }, + "IPAddress": { + "oneOf": [ + { + "type": "object", + "properties": { + "etsi280:IPv4Address": { + "$ref": "#/$defs/IPv4Address" + } + }, + "required": [ + "etsi280:IPv4Address" + ] + }, + { + "type": "object", + "properties": { + "etsi280:IPv6Address": { + "$ref": "#/$defs/IPv6Address" + } + }, + "required": [ + "etsi280:IPv6Address" + ] + } + ] + }, + "IPCIDR": { + "oneOf": [ + { + "type": "object", + "properties": { + "etsi280:IPv4CIDR": { + "$ref": "#/$defs/IPv4CIDR" + } + }, + "required": [ + "etsi280:IPv4CIDR" + ] + }, + { + "type": "object", + "properties": { + "etsi280:IPv6CIDR": { + "$ref": "#/$defs/IPv6CIDR" + } + }, + "required": [ + "etsi280:IPv6CIDR" + ] + } + ] + }, + "TCPPortRange": { + "type": "object", + "properties": { + "etsi280:start": { + "$ref": "#/$defs/TCPPort" + }, + "etsi280:end": { + "$ref": "#/$defs/TCPPort" + } + }, + "required": [ + "etsi280:start", + "etsi280:end" + ] + }, + "UDPPortRange": { + "type": "object", + "properties": { + "etsi280:start": { + "$ref": "#/$defs/UDPPort" + }, + "etsi280:end": { + "$ref": "#/$defs/UDPPort" + } + }, + "required": [ + "etsi280:start", + "etsi280:end" + ] + }, + "Port": { + "oneOf": [ + { + "type": "object", + "properties": { + "etsi280:TCPPort": { + "$ref": "#/$defs/TCPPort" + } + }, + "required": [ + "etsi280:TCPPort" + ] + }, + { + "type": "object", + "properties": { + "etsi280:UDPPort": { + "$ref": "#/$defs/UDPPort" + } + }, + "required": [ + "etsi280:UDPPort" + ] + } + ] + }, + "PortRange": { + "oneOf": [ + { + "type": "object", + "properties": { + "etsi280:TCPPortRange": { + "$ref": "#/$defs/TCPPortRange" + } + }, + "required": [ + "etsi280:TCPPortRange" + ] + }, + { + "type": "object", + "properties": { + "etsi280:UDPPortRange": { + "$ref": "#/$defs/UDPPortRange" + } + }, + "required": [ + "etsi280:UDPPortRange" + ] + } + ] + }, + "IPAddressPort": { + "type": "object", + "properties": { + "etsi280:address": { + "$ref": "#/$defs/IPAddress" + }, + "etsi280:port": { + "$ref": "#/$defs/Port" + } + }, + "required": [ + "etsi280:address", + "etsi280:port" + ] + }, + "IPAddressPortRange": { + "type": "object", + "properties": { + "etsi280:address": { + "$ref": "#/$defs/IPAddress" + }, + "etsi280:portRange": { + "$ref": "#/$defs/PortRange" + } + }, + "required": [ + "etsi280:address", + "etsi280:portRange" + ] + }, + "WGS84CoordinateDecimal": { + "type": "object", + "properties": { + "etsi280:latitude": { + "$ref": "#/$defs/WGS84LatitudeDecimal" + }, + "etsi280:longitude": { + "$ref": "#/$defs/WGS84LongitudeDecimal" + } + }, + "required": [ + "etsi280:latitude", + "etsi280:longitude" + ] + }, + "WGS84CoordinateAngular": { + "type": "object", + "properties": { + "etsi280:latitude": { + "$ref": "#/$defs/WGS84LatitudeAngular" + }, + "etsi280:longitude": { + "$ref": "#/$defs/WGS84LongitudeAngular" + } + }, + "required": [ + "etsi280:latitude", + "etsi280:longitude" + ] + } + } +} \ No newline at end of file diff --git a/utils/json_to_xml.py b/utils/json_to_xml.py new file mode 100644 index 0000000000000000000000000000000000000000..17764a7af097aae7a2020522a1ae45adafa7d26a --- /dev/null +++ b/utils/json_to_xml.py @@ -0,0 +1,35 @@ +import sys +import logging +from pprint import pprint +import json +from pathlib import Path +import fileinput + +import xmltodict +import argparse + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') + parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + args = parser.parse_args() + + match args.verbose: + case v if v and v >= 2: + logging.basicConfig(level=logging.DEBUG) + case 1: + logging.basicConfig(level=logging.INFO) + case _: + logging.basicConfig(level=logging.WARNING) + + logging.debug(f"Arguments: {args}") + + s = args.input.read() + args.input.close() + + logging.debug(s) + j = json.loads(s) + + xml = xmltodict.unparse({'HI1Message' : j}, ) + print(xml) \ No newline at end of file diff --git a/utils/translate/ChoiceMapping.py b/utils/translate/ChoiceMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..b477a3360e0a6a7b891f5854951eb4f37dce4b39 --- /dev/null +++ b/utils/translate/ChoiceMapping.py @@ -0,0 +1,47 @@ +import logging + +from xmlschema.validators.simple_types import * +from xmlschema.validators.complex_types import * +from xmlschema.validators.groups import * +from xmlschema.validators.facets import * + +from .TypeMapping import TypeMapping +from .ComplexTypeMapping import ComplexTypeMapping + +log = logging.getLogger() + +class ChoiceMapping(ComplexTypeMapping): + @classmethod + def process_choice(cls, choice: XsdGroup, current_ns : str, ns_to_id_map): + if choice.model != 'choice': + raise Exception(f"Wrong group type: {c.model}") + oneOf = [] + for c in choice.iter_model(): + if not (type(c) is XsdElement): + raise Exception (f"Non-element {c} encountered in choice {choice}") + element_name = c.local_name + if c.target_namespace in ns_to_id_map: + ns = ns_to_id_map[c.target_namespace] + if 'prefix' in ns: + element_name = ns['prefix'] + ":" + element_name + t = TypeMapping.get_type_from_elem(c, current_ns) + oneOf.append({ + "type" : "object", + "properties" : { + element_name : t + }, + "required" : [element_name] + }) + return oneOf + + def map(self, xst : BaseXsdType): + log.debug(f"Attempting mapping of {xst} to choice") + j = super().map(xst) + if j is None: + log.debug("Not a complex type, giving up") + return None + content = xst.content + if (content.model != 'choice'): + log.debug("Not a choice, giving up") + return None + return { 'oneOf' : ChoiceMapping.process_choice(content, xst.namespaces[''], self.ns_to_id_map)} diff --git a/utils/translate/ComplexTypeMapping.py b/utils/translate/ComplexTypeMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..e18190901be2c3e0695c00a772be5364cce591d1 --- /dev/null +++ b/utils/translate/ComplexTypeMapping.py @@ -0,0 +1,11 @@ +from xmlschema.validators.complex_types import * + +from .TypeMapping import TypeMapping + +class ComplexTypeMapping(TypeMapping): + def map(self, xst: BaseXsdType): + if not (type(xst) is XsdComplexType): + return None + return { + "type" : "object" + } diff --git a/utils/translate/SequenceMapping.py b/utils/translate/SequenceMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..4dc5c93e8a70c5a68cc753891032789d268c1be4 --- /dev/null +++ b/utils/translate/SequenceMapping.py @@ -0,0 +1,85 @@ +import logging + +from xmlschema.validators.simple_types import * +from xmlschema.validators.complex_types import * +from xmlschema.validators.groups import * +from xmlschema.validators.facets import * + +from .TypeMapping import TypeMapping +from .ChoiceMapping import ChoiceMapping +from .ComplexTypeMapping import ComplexTypeMapping + +log = logging.getLogger() + + +class SequenceMapping(ComplexTypeMapping): + def map(self, xst: BaseXsdType): + log.debug(f"Attempting mapping of {xst} to sequence") + j = super().map(xst) + if j is None: + log.debug("Not a complex type, giving up") + return None + content = xst.content + if (content.model != 'sequence'): + log.debug("Not a sequence, giving up") + return None + mapped_type = { + 'type' : 'object', + 'properties' : {}, + 'required' : [] + } + + # Not going to try and do all of this automatically for now + # Only make insert the xsiType parameter + if (xst.base_type): + # mapped_type['__DESCENDENT_OF__'] = TypeMapping.get_ref_for(xst.base_type, xst.namespaces['']) + mapped_type['properties']['@xsi:type'] = { + "type" : "string", + "enum" : xst.name + } + mapped_type['required'].append('@xsi:type') + # if xst.abstract: + # mapped_type['__ABSTRACT__'] = True + # pass + + inner_choice = None + for c in list(content.iter_model()): + log.debug(f"Processing model item {c}") + if type(c) is XsdElement: + element_name = c.local_name + if c.target_namespace in self.ns_to_id_map: + ns = self.ns_to_id_map[c.target_namespace] + if 'prefix' in ns: + element_name = ns['prefix'] + ":" + element_name + if c.effective_max_occurs != 1: + mapped_type['properties'][element_name] = { + "type" : "array", + "items" : TypeMapping.get_type_from_elem(c, xst.namespaces['']) + } + if c.effective_max_occurs: + mapped_type['properties'][element_name]['maxItems'] = c.effective_max_occurs + if c.effective_min_occurs > 0: + mapped_type['properties'][element_name]['minItems'] = c.effective_min_occurs + else: + mapped_type['properties'][element_name] = TypeMapping.get_type_from_elem(c, xst.namespaces['']) + if c.effective_min_occurs == 1: + mapped_type['required'].append(element_name) + elif type(c) is XsdGroup: + if inner_choice: + raise Exception (f"Second group '{element_name}' encountered in {xst}") + if c.model != "choice": + raise Exception (f"Don't know what to do with inner group {c} in {xst} - not a choice") + inner_choice = ChoiceMapping.process_choice(c, xst.namespaces[''], self.ns_to_id_map) + elif type(c) is XsdAnyElement: + mapped_type = {} + else: + raise Exception(f"Unknown element type {c}") + if (inner_choice): + return { + 'allOf' : [ + mapped_type, + {'oneOf' : inner_choice} + ] + } + else: + return mapped_type diff --git a/utils/translate/SimpleTypeMapping.py b/utils/translate/SimpleTypeMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..2e60f9ca06b321a28992c2a32235707671f95735 --- /dev/null +++ b/utils/translate/SimpleTypeMapping.py @@ -0,0 +1,18 @@ +import logging + +from xmlschema.validators.complex_types import * +from xmlschema.validators.simple_types import XsdAtomicRestriction + +from .TypeMapping import TypeMapping + +log = logging.getLogger() + +class SimpleTypeMapping(TypeMapping): + def map(self, xst: BaseXsdType): + log.debug(f"Attempting mapping of {xst} to simple type") + if not (type(xst) is XsdAtomicRestriction): + log.debug("Type is not an XsdAtomicRestriction, giving up") + return None + return { + "$ref" : xst.base_type.name + } \ No newline at end of file diff --git a/utils/translate/TypeMapping.py b/utils/translate/TypeMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..2b4b785ceeedb631c98ecf63df5c8d9754848b29 --- /dev/null +++ b/utils/translate/TypeMapping.py @@ -0,0 +1,70 @@ +import logging +from abc import ABC, abstractmethod + +from xmlschema.validators.simple_types import * +from xmlschema.validators.complex_types import * +from xmlschema.validators.groups import * +from xmlschema.validators.facets import * + +log = logging.getLogger() + +class TypeMapping(ABC): + ns_to_id_map = {} + + XSD_NS = "http://www.w3.org/2001/XMLSchema" + + XSD_TYPE_MAP = { + "string" : { "type" : "string" }, + "normalizedString" : { "type" : "string"}, + "dateTime" : { "type" : "string"}, + "token" : { "type" : "string"}, + "anyURI" : { "type" : "string" }, + + "integer" : { "type" : "integer"}, + "nonNegativeInteger" : { "type" : "integer", "minimum" : 0}, + "positiveInteger" : { "type" : "integer", "minimum" : 1}, + + "boolean" : { "type" : "boolean" }, + + "hexBinary" : { "type" : "string", "pattern" : "^([a-fA-F0-9]{2})*$"}, + "base64Binary" : { "type" : "string", "pattern" : "^[A-Za-z0-9+\/]*={0,3}$"}, + + "anyType" : {} + } + + @abstractmethod + def map(self, xst : BaseXsdType): + return None + + @classmethod + def extract_namespace(cls, qname: str): + match = re.search(r'^\{([^\{\}]+)\}(([^\{\}]+))$', qname) + if match is None: + return None + return match.group(1) + + @classmethod + def get_ref_for(cls, xsd_type: XsdType, current_ns : str): + ns = cls.extract_namespace(xsd_type.name) + if ns == current_ns: + return { "$ref" : f"#/$defs/{xsd_type.local_name}" } + else: + mapped_id = cls.ns_to_id_map[ns] + return { "$ref" : f"{mapped_id['id']}#/$defs/{xsd_type.local_name}"} + + @classmethod + def get_type_from_elem(cls, elem: XsdElement, current_ns : str): + ns = cls.extract_namespace(elem.type.name) + if (ns == TypeMapping.XSD_NS): + # this should be an XSD primitive type + return dict(TypeMapping.XSD_TYPE_MAP[elem.type.local_name]) + else: + return cls.get_ref_for(elem.type, current_ns) + + + + + + + + diff --git a/utils/translate/XSDNativeSimpleTypeMapping.py b/utils/translate/XSDNativeSimpleTypeMapping.py new file mode 100644 index 0000000000000000000000000000000000000000..772ac10b308d4c2128bcf1bac3271117d9bbe80e --- /dev/null +++ b/utils/translate/XSDNativeSimpleTypeMapping.py @@ -0,0 +1,72 @@ +import logging + +from xmlschema.validators.simple_types import * +from xmlschema.validators.complex_types import * +from xmlschema.validators.groups import * +from xmlschema.validators.facets import * + +from .TypeMapping import TypeMapping +from .SimpleTypeMapping import SimpleTypeMapping + +log = logging.getLogger() + +class XSDNativeSimpleTypeMapping(SimpleTypeMapping): + + def map(self, xst: BaseXsdType): + log.debug(f"Attempting mapping of {xst} to XSD native type") + j = super().map(xst) + if j is None: + log.debug("Not a simple type, giving up") + return None + + mapped_type = TypeMapping.XSD_TYPE_MAP.get(xst.base_type.local_name) + parent_type = None + + if mapped_type is None: + ns = TypeMapping.extract_namespace(xst.base_type.name) + if ns == XSDNativeSimpleTypeMapping.XSD_NS: + print (xst) + print (xst.base_type) + raise Exception (f"No mapping for xs:{xst.base_type.local_name}") + if len(xst.facets) == 0: + mapped_type = TypeMapping.get_ref_for(xst.base_type, xst.namespaces['']) + else: + parent_type = TypeMapping.get_ref_for(xst.base_type, xst.namespaces['']) + mapped_type = TypeMapping.XSD_TYPE_MAP.get(xst.root_type.local_name) + if mapped_type is None: + raise Exception (f"Could not find mapping for root type xs:{xst.root_type.local_name}") + + mapped_type = dict(mapped_type) + + for k, v in xst.facets.items(): + log.debug(f"Mapping facet {v}") + if type(v) is XsdMaxLengthFacet: + mapped_type['maxLength'] = v.value + continue + if type(v) is XsdMinLengthFacet: + mapped_type['minLength'] = v.value + continue + if type(v) is XsdPatternFacets: + if len(v.regexps) > 1: + raise Exception (f"Multiple patterns given in facet {v} of {xst}") + p = v.regexps[0] + if (not p.startswith('^')) and (not p.endswith('$')): + p = f"^{p}$" + mapped_type['pattern'] = p + continue + if type (v) is XsdMinInclusiveFacet: + mapped_type['minimum'] = v.value + continue + if type (v) is XsdMaxInclusiveFacet: + mapped_type['maximum'] = v.value + continue + if type (v) is XsdMinExclusiveFacet: + mapped_type['exclusiveMinimum'] = v.value + continue + if type (v) is XsdMaxExclusiveFacet: + mapped_type['exclusiveMaximum'] = v.value + continue + raise Exception (f"Unhandled facet {v}") + if parent_type: + return { 'allOf' : [parent_type, mapped_type] } + return mapped_type diff --git a/utils/translate/__init__.py b/utils/translate/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ba05008f9be181798aad7578d1cb1b75ee595f71 --- /dev/null +++ b/utils/translate/__init__.py @@ -0,0 +1,64 @@ +import logging + +from pathlib import Path +from xmlschema import * + +from .TypeMapping import * +from .XSDNativeSimpleTypeMapping import XSDNativeSimpleTypeMapping +from .ChoiceMapping import ChoiceMapping +from .SequenceMapping import SequenceMapping + + +log = logging.getLogger() + +mappings = [ + XSDNativeSimpleTypeMapping(), + ChoiceMapping(), + SequenceMapping(), +] + +def translate_schema (schema_path: str, ns_to_id_map: dict, schema_locations = []): + js = { + "$id" : "?", + "$defs" : {} + } + + logging.info(f"Translating schema {schema_path}") + xs = XMLSchema(schema_path, validation='lax', locations=schema_locations) + logging.info(f"Schema namespace: {xs.target_namespace}" ) + + schema_id = ns_to_id_map[xs.target_namespace]["id"] + js['$id'] = schema_id + + TypeMapping.ns_to_id_map = ns_to_id_map + + elementList = [] + for elementName, element in xs.elements.items(): + logging.info(f"Processing element {elementName} : {element}") + elementList.append(TypeMapping.get_ref_for(element.type, element.namespaces[''])) + if len(elementList) == 1: + js['$ref'] = elementList[0]['$ref'] + elif len(elementList) > 1: + js['oneOf'] = elementList + + descendent_types = {} + for type_name, xsd_type in xs.types.items(): + logging.info(f"Processing {type_name} : {xsd_type}") + + j = None + for mapping in mappings: + log.debug("\n----------------------------------------") + j = mapping.map(xsd_type) + if j is None: + continue + else: + break + if j is None: + raise Exception(f"Unmapped type {type_name} ({xsd_type})") + js["$defs"][xsd_type.local_name] = j + logging.debug (f"Mapped {type_name} to {j}") + + print (descendent_types) + return js + + diff --git a/utils/translate_spec.py b/utils/translate_spec.py new file mode 100644 index 0000000000000000000000000000000000000000..54ac5cd4417290c3bbd6700c9f7bf41feba19e60 --- /dev/null +++ b/utils/translate_spec.py @@ -0,0 +1,105 @@ +import json +import os +import logging +from pathlib import Path +import sys + +from xmlschema import * + +from translate import * + +logging.basicConfig(level = logging.INFO) + +def build_schema_locations (paths): + schema_locations = [] + for schemaFile in paths: + try: + xs = XMLSchema(schemaFile, validation='skip') + schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve()))) + logging.debug (" [ {0} -> {1} ]".format(xs.target_namespace, schemaFile)) + except XMLSchemaParseError as ex: + logging.debug (" [ {0} failed to parse: {1} ]".format(schemaFile, ex)) + return schema_locations + +def get_json(filename): + with open(filename) as f: + j = json.load(f) + return j + +def convert_ns_to_id (ns): + if ns.startswith('http://uri.etsi.org'): + c = ns.split("/") + return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}" + else: + return ns.replace("http://","").replace("/","_") + +def convert_xsd_to_filename (xsd): + f = Path(xsd) + return f.name.replace('.xsd', '.schema.json') + +if __name__ == "__main__": + if len(sys.argv) < 2: + logging.error ("Usage: translate_spec.py path_to_config_file") + exit(-1) + + config = get_json(sys.argv[1]) + + + logging.info("Bulding ns map...") + ns_map = {} + for location, settings in config['schemas'].items(): + xs = XMLSchema(location, validation='skip') + ns = xs.target_namespace + id = convert_ns_to_id(ns) + ns_map[ns] = { + "id" : id, + "location" : str(Path(location).resolve()) + } | settings + logging.debug(ns_map) + + logging.info("Building schema locations") + schema_locations = [(k, v["location"]) for k,v in ns_map.items()] + logging.debug(schema_locations) + + output_path = Path(config['output']) + if not output_path.exists(): + logging.info("Creating output directory") + os.mkdir(str(output_path)) + + logging.info("Translating schemas...") + json_schemas = {} + for schema_tuple in schema_locations: + logging.info(f" Translating {schema_tuple}") + if 'xmldsig' in (schema_tuple[1]): + # TODO - work out what to do here + logging.info(" Skipping XML Dsig...") + continue + js = translate_schema(schema_tuple[1], ns_map, schema_locations) + + # TODO - Special case, get rid of signature + if ns_map[schema_tuple[0]] == 'core.json': + js['$defs']['HI1Message']['properties'].pop('Signature') + js_path = output_path / convert_xsd_to_filename(schema_tuple[1]) + + # TODO - Special case - abstract HI1Object + if "Core" in schema_tuple[1]: + js["$defs"]['ConcreteHI1Object'] = { + 'oneOf' : [ + {'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'}, + {'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'}, + {'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'}, + {'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'}, + {'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'}, + {'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'}, + {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'}, + ] + } + + json_string = json.dumps(js, indent=2) + + if "Core" in schema_tuple[1]: + json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"') + + with open(str(js_path), 'w') as f: + f.write(json_string) + json_schemas[js['$id']] = json.loads(json_string) diff --git a/utils/ts103120_config.json b/utils/ts103120_config.json new file mode 100644 index 0000000000000000000000000000000000000000..d21337616b4ad9abbaec83c2a06a149f9b89e1d5 --- /dev/null +++ b/utils/ts103120_config.json @@ -0,0 +1,36 @@ +{ + "schemas" : { + "./103120/schema/ts_103120_Authorisation.xsd" : { + "prefix" : "auth" + }, + "./103120/schema/ts_103120_Common.xsd" : { + "prefix" : "common" + }, + "./103120/schema/ts_103120_Core.xsd" : { + }, + "./103120/schema/ts_103120_Delivery.xsd" : { + "prefix" : "delivery" + }, + "./103120/schema/ts_103120_Document.xsd" : { + "prefix" : "doc" + }, + "./103120/schema/ts_103120_Notification.xsd" : { + "prefix" : "notification" + }, + "./103120/schema/ts_103120_Task.xsd" : { + "prefix" : "task" + }, + "./103120/schema/ts_103120_TrafficPolicy.xsd" : { + "prefix" : "tp" + }, + "./103280/TS_103_280.xsd" : { + "prefix" : "etsi280", + "skip" : true + }, + "./testing/deps/xmldsig/xmldsig-core-schema.xsd" : { + "prefix" : "xmldsig", + "skip" : true + } + }, + "output" : "./103120/schema/json/" +} \ No newline at end of file diff --git a/utils/validate.py b/utils/validate.py new file mode 100644 index 0000000000000000000000000000000000000000..17070bbbd1a96867c356331bcb7699cc652f034b --- /dev/null +++ b/utils/validate.py @@ -0,0 +1,120 @@ +import sys +from jsonschema import validate, RefResolver, Draft202012Validator +from jsonschema.exceptions import ValidationError +import json +from pathlib import Path +import logging +import argparse + + +def handle_uri(u): + print(u) + +def load_json(path : str): + with open(path) as f: + return json.load(f) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument('-s','--schemadir', action="append", help="Directory containing supporting schema files to use for validation") + parser.add_argument('-v', '--verbose', action="count", help="Verbose logging (can be specified multiple times)") + parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + parser.add_argument('schema', help="Primary schema to validate against") + + args = parser.parse_args() + + match args.verbose: + case v if v and v >= 2: + logging.basicConfig(level=logging.DEBUG) + case 1: + logging.basicConfig(level=logging.INFO) + case _: + logging.basicConfig(level=logging.WARNING) + + logging.debug(f"Arguments: {args}") + + instance_doc = json.loads(args.input.read()) + args.input.close() + main_schema = load_json(args.schema) + schema_dict = { main_schema['$id'] : main_schema } + + if args.schemadir: + schema_paths = [] + for d in args.schemadir: + logging.info(f"Searching {d}") + logging.info(list(Path(d).rglob("*.schema.json"))) + schema_paths += [f for f in Path(d).rglob("*.schema.json")] + logging.info(f"Schema files loaded: {schema_paths}") + + schemas_json = [json.load(p.open()) for p in schema_paths] + schema_dict = schema_dict | { s['$id'] : s for s in schemas_json } + + logging.info(f"Schema IDs loaded: {[k for k in schema_dict.keys()]}") + + logging.debug (f"Instance doc: {instance_doc}") + logging.debug (f"Main schema: {main_schema}") + + resolver = RefResolver(None, + referrer=None, + store=schema_dict) + + v = Draft202012Validator(main_schema, resolver=resolver) + try: + v.validate(instance_doc) + except ValidationError as ex: + # Any failure within the Payload element results in a failure against the oneOf constraint in the Payload element + # This isn't terribly helpful in working out what is actually wrong, so in this case we attempt an explicit + # validation against the relevant oneOf alternation to try and get a more useful validation error + if list(ex.schema_path) == ['properties', 'Payload', 'oneOf']: + logging.error ("Error detected validating payload oneOf - attempting explicit validation...") + try: + if 'RequestPayload' in instance_doc['Payload'].keys(): + request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/RequestPayload" } + v = Draft202012Validator(request_fragment_schema, resolver=resolver) + v.validate(instance_doc['Payload']['RequestPayload']) + elif 'ResponsePayload' in instance_doc['Payload'].keys(): + request_fragment_schema = { "$ref" : "ts_103120_Core_2019_10#/$defs/ResponsePayload" } + v = Draft202012Validator(request_fragment_schema, resolver=resolver) + v.validate(instance_doc['Payload']['ResponsePayload']) + else: + logging.error("No RequestPayload or ResponsePayload found - is the Payload malformed?") + raise ex + except ValidationError as ex2: + # Similar to above, this is inner validation to try and get a more useful error in the event + # that something fails the verb oneOf constraint + logging.error("Error detected in ActionRequests/ActionResponses") + error_path = list(ex2.schema_path) + if error_path != ['properties', 'ActionRequests', 'properties', 'ActionRequest', 'items', 'allOf', 1, 'oneOf'] and error_path != ['properties', 'ActionResponses', 'properties', 'ActionResponse', 'items', 'allOf', 1, 'oneOf']: + logging.error("Error not in inner Request/Response allOf/oneOf constraint") + raise ex2 + j = ex2.instance + j.pop('ActionIdentifier') # Remove ActionIdentifier - one remaining key will be the verb + verb = list(j.keys())[0] + message = "Request" if error_path[1] == "ActionRequests" else "Response" + v = Draft202012Validator({"$ref" : f"ts_103120_Core_2019_10#/$defs/{verb}{message}"}, resolver=resolver) + try: + v.validate(j[verb]) + except ValidationError as ex3: + logging.error("Error detected in verb") + # The final level of validation is for the actual HI1Object validation + if list(ex3.schema_path) != ['properties', 'HI1Object', 'oneOf']: + logging.error("Error not inside HI1Object") + raise ex3 + object_type = ex3.instance['@xsi:type'].split('}')[-1] + object_ref = { + 'AuthorisationObject': 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject', + 'LITaskObject': 'ts_103120_Task_2020_09#/$defs/LITaskObject', + 'LDTaskObject': 'ts_103120_Task_2020_09#/$defs/LDTaskObject', + 'DocumentObject': 'ts_103120_Document_2020_09#/$defs/DocumentObject', + 'NotificationObject': 'ts_103120_Notification_2016_02#/$defs/NotificationObject', + 'DeliveryObject': 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject', + 'TrafficPolicyObject': 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject' + }[object_type] + v = Draft202012Validator({"$ref" : object_ref}, resolver=resolver) + v.validate(ex3.instance) + + exit(-1) + + + logging.info("Done") \ No newline at end of file diff --git a/utils/xml_to_json.py b/utils/xml_to_json.py new file mode 100644 index 0000000000000000000000000000000000000000..66145f9b47f30464b43be004fcf37ce27f147c34 --- /dev/null +++ b/utils/xml_to_json.py @@ -0,0 +1,117 @@ +import sys +import logging +from pprint import pprint +import json +from pathlib import Path +import fileinput + +import xmltodict +import argparse + + +def extract_prefixes (d): + return { k.split(':')[1]: d[k] for k in d.keys() if k.startswith("@xmlns:") } + +def removePrefixes (o, prefixes): + if not isinstance(o, dict): return + replacements = [] + for k,v in o.items(): + if isinstance(v, dict): + removePrefixes(v, prefixes) + if isinstance(v, list): + for i in v: + removePrefixes(i, prefixes) + if ":" in k: + prefix = k.split(':')[0] + if (prefix) in prefixes: + new_key = k.split(':')[1] + replacements.append( (k, new_key) ) + for r in replacements: + o[r[1]] = o.pop(r[0]) + +object_namespaces = { + 'AuthorisationObject' : 'http://uri.etsi.org/03120/common/2020/09/Authorisation', + 'DeliveryObject' : 'http://uri.etsi.org/03120/common/2019/10/Delivery', + 'DocumentObject' : 'http://uri.etsi.org/03120/common/2020/09/Document', + 'NotificationObject' : 'http://uri.etsi.org/03120/common/2016/02/Notification', + 'LITaskObject' : 'http://uri.etsi.org/03120/common/2020/09/Task', + 'LDTaskObject' : 'http://uri.etsi.org/03120/common/2020/09/Task', + 'TrafficPolicyObject' : 'http://uri.etsi.org/03120/common/2022/07/TrafficPolicy', + 'TrafficRuleObject' : 'http://uri.etsi.org/03120/common/2022/07/TrafficPolicy' +} + +coerce_to_list = [ + 'auth:AuthorisationApprovalDetails', + 'auth:AuthorisationFlag', + 'auth:CSPID', + 'common:ApproverContactDetails', + 'ActionRequest', + 'ActionResponse', + 'ListResponseRecord', + 'AssociatedObject', + 'doc:DocumentSignature', + 'doc:DocumentProperty', + 'notification:AssociatedObjectStatus', + 'task:ApprovalDetails', + 'task:TargetIdentifierValue', + 'task:DeliveryDestination', + 'task:TaskFlag', + 'task:ApprovalDetails', + 'task:ObservedTimes', + 'task:RequestValue', + 'task:RequestSubtype', + 'task:LDDeliveryDestination', + 'task:LDTaskFlag', + 'task:TrafficPolicyReference', + 'tp:TrafficRuleReference', + 'tp:Criteria' +] + +coerce_to_int = [ + 'ActionIdentifier', + 'delivery:SequenceNumber' +] + +coerce_to_bool = [ + 'delivery:LastSequence' +] + +def postprocessor (path, key, value): + if key == "@xsi:type": + object_name = value.split(":")[-1] + if object_name in object_namespaces.keys(): + value = "{" + object_namespaces[object_name] + "}" + object_name + return key, value + if key in coerce_to_int: + return key, int(value) + if key in coerce_to_bool: + return key, bool(value) + return key, value + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', action='count', help='Verbose logging (can be specified multiple times)') + parser.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin, help="Path to input file (if absent, stdin is used)") + args = parser.parse_args() + + match args.verbose: + case v if v and v >= 2: + logging.basicConfig(level=logging.DEBUG) + case 1: + logging.basicConfig(level=logging.INFO) + case _: + logging.basicConfig(level=logging.WARNING) + + logging.debug(f"Arguments: {args}") + + s = args.input.read() + args.input.close() + + logging.debug(s) + + d = xmltodict.parse(s, + force_list=tuple(coerce_to_list), + postprocessor=postprocessor + )['HI1Message'] + + print(json.dumps(d, indent=2))