Commit c6d8e230 authored by Mark Canterbury's avatar Mark Canterbury
Browse files

Refactoring and making per-file

parent ec338d9e
Loading
Loading
Loading
Loading

translate.py

0 → 100644
+14 −0
Original line number Diff line number Diff line
import json
import logging
from pathlib import Path

from xmlschema import *

from translate_schema import *

logging.basicConfig(level = logging.DEBUG)

if __name__ == "__main__":    
    js = translate_schema("103280/TS_103_280.xsd", "103120.json")
    print(json.dumps(js, indent=2))
+42 −0
Original line number Diff line number Diff line
import logging
from numbers import Complex
from pathlib import Path

from xmlschema import *

from .type_mapping import *

log = logging.getLogger()

mappings = [
    XSDNativeSimpleTypeMapping(),
    ChoiceMapping(),
    SequenceMapping(),
]

def translate_schema (schema_path, schema_id):
    js = {
        "$id" : schema_id,
        "$defs" : {}
    }

    logging.info(f"Translating schema {schema_path}")
    xs = XMLSchema(schema_path, validation='lax')
    logging.info(f"Schema namespace: {xs.target_namespace}" )

    for type_name, xsd_type in xs.types.items():
        logging.info(f"Processing {type_name} : {xsd_type}")

        j = None
        for mapping in mappings:
            log.debug("\n----------------------------------------")
            j = mapping.map(xsd_type)
            if j is None:
                continue
            else:
                break
        if j is None:
            raise Exception(f"Unmapped type {type_name} ({xsd_type})")
        js["$defs"][xsd_type.local_name] = j
        logging.debug (f"Mapped {type_name} to {j}")
    return js
 No newline at end of file
+185 −0
Original line number Diff line number Diff line
from copy import deepcopy
import logging
from abc import ABC, abstractmethod

from xmlschema.validators.simple_types import *
from xmlschema.validators.complex_types import *
from xmlschema.validators.groups import *
from xmlschema.validators.facets import *

from .util import *

log = logging.getLogger()

class TypeMapping(ABC):
    @abstractmethod
    def map(self, xst : BaseXsdType):
        return None

class SimpleTypeMapping(TypeMapping):
    def map(self, xst: BaseXsdType):
        log.debug(f"Attempting mapping of {xst} to simple type")
        if not (type(xst) is XsdAtomicRestriction):
            log.debug("Type is not an XsdAtomicRestriction, giving up")
            return None
        return {
            "$ref" : xst.base_type.name
        }

class XSDNativeSimpleTypeMapping(SimpleTypeMapping):
    XSD_NS = "http://www.w3.org/2001/XMLSchema"

    XSD_TYPE_MAP = {
        "string" : { "type" : "string" },
        "normalizedString" : { "type" : "string", "pattern" : "^[^\r\n\t]*$"},
        "dateTime" : { "type" : "string"},
        "token" : { "type" : "string", "pattern" : "^[^\r\n\t]*$"},
        "anyURI" : { "type" : "string" },

        "integer" : { "type" : "integer"},

        "hexBinary" : { "type" : "string", "pattern" : "^([a-fA-F0-9]{2})*$"}
    }

    def map(self, xst: BaseXsdType):
        log.debug(f"Attempting mapping of {xst} to XSD native type")
        j = super().map(xst)
        if j is None:
            log.debug("Not a simple type, giving up")
            return None


        mapped_type = XSDNativeSimpleTypeMapping.XSD_TYPE_MAP.get(xst.base_type.local_name)
        parent_type = None
        if mapped_type is None:
            ns = extract_namespace(xst.base_type.name)
            if ns == XSDNativeSimpleTypeMapping.XSD_NS:
                raise Exception (f"No mapping for xs:{xst.base_type.local_name}")
            parent_type = get_ref_for(xst.base_type, xst.namespaces[''])
            mapped_type = XSDNativeSimpleTypeMapping.XSD_TYPE_MAP.get(xst.root_type.local_name)
            if mapped_type is None:
                raise Exception (f"Could not find mapping for root type xs:{xst.root_type.local_name}")

        mapped_type = dict(mapped_type)
        
        for k, v in xst.facets.items():
            log.debug(f"Mapping facet {v}")
            if type(v) is XsdMaxLengthFacet:
                mapped_type['maxLength'] = v.value
                continue
            if type(v) is XsdMinLengthFacet:
                mapped_type['minLength'] = v.value
                continue
            if type(v) is XsdPatternFacets:
                if len(v.regexps) > 1:
                    raise Exception (f"Multiple patterns given in facet {v} of {xst}")
                p = v.regexps[0]
                if (not p.startswith('^')) and (not p.endswith('$')):
                    p = f"^{p}$"
                mapped_type['pattern'] = p
                continue
            if type (v) is XsdMinInclusiveFacet:
                mapped_type['minimum'] = v.value
                continue
            if type (v) is XsdMaxInclusiveFacet:
                mapped_type['maximum'] = v.value
                continue
            if type (v) is XsdMinExclusiveFacet:
                mapped_type['exclusiveMinimum'] = v.value
                continue
            if type (v) is XsdMaxExclusiveFacet:
                mapped_type['exclusiveMaximum'] = v.value
                continue
            raise Exception (f"Unhandled facet {v}")
        if parent_type:
            return { 'allOf' : [parent_type, mapped_type] }
        return mapped_type


class ComplexTypeMapping(TypeMapping):
    def map(self, xst: BaseXsdType):
        if not (type(xst) is XsdComplexType):
            return None
        return {
            "type" : "object"
        }

def get_type_from_elem(elem: XsdElement, current_ns : str):
    ns = extract_namespace(elem.type.name)
    if (ns == "http://www.w3.org/2001/XMLSchema"):
        # this should be an XSD primitive type
        return dict(XSDNativeSimpleTypeMapping.XSD_TYPE_MAP[elem.type.local_name])
    else:
        return get_ref_for(elem.type, current_ns)


def process_choice(choice: XsdGroup, current_ns : str):
    if choice.model != 'choice':
        raise Exception(f"Wrong group type: {c.model}")
    oneOf = []
    for c in choice.iter_model():
        if not (type(c) is XsdElement):
            raise Exception (f"Non-element {c} encountered in choice {choice}")
        t = get_type_from_elem(c, current_ns)
        oneOf.append({
            "type" : "object",
            "properties" : {
                c.local_name : t
            },
            "required" : [c.local_name]
        })
    return oneOf   

class ChoiceMapping(ComplexTypeMapping):
    def map(self, xst : BaseXsdType):
        log.debug(f"Attempting mapping of {xst} to sequence")
        j = super().map(xst)
        if j is None:
            log.debug("Not a complex type, giving up")
            return None
        content = xst.content
        if (content.model != 'choice'):
            log.debug("Not a sequence, giving up")
            return None
        return { 'oneOf' : process_choice(content, xst.namespaces[''])}

class SequenceMapping(ComplexTypeMapping):
    def map(self, xst: BaseXsdType):
        log.debug(f"Attempting mapping of {xst} to sequence")
        j = super().map(xst)
        if j is None:
            log.debug("Not a complex type, giving up")
            return None
        content = xst.content
        if (content.model != 'sequence'):
            log.debug("Not a sequence, giving up")
            return None
        mapped_type = {
            'type' : 'object',
            'properties' : {},
            'required' : []
        }
        inner_choice = None
        for c in list(content.iter_model()):
            log.debug(f"Processing model item {c}")
            if type(c) is XsdElement:
                mapped_type['properties'][c.local_name] = get_type_from_elem(c, xst.namespaces[''])
                if c.effective_min_occurs == 1:
                    mapped_type['required'].append(c.local_name)
            elif type(c) is XsdGroup:
                if inner_choice:
                    raise Exception (f"Second group '{c.local_name}' encountered in {xst}")
                if c.model != "choice":
                    raise Exception (f"Don't know what to do with inner group {c} in {xst} - not a choice")
                inner_choice = process_choice(c)
            else:
                raise Exception(f"Unknown element type {c}")
        if (inner_choice):
            return { 
                'allOf' : [
                    mapped_type,
                    {'oneOf' : inner_choice}
                ]
            }
        else:
            return mapped_type
+24 −0
Original line number Diff line number Diff line
import re

from xmlschema import *

def extract_namespace(qname: str):
    match = re.search(r'^\{([^\{\}]+)\}(([^\{\}]+))$', qname)
    if match is None:
        return None
    return match.group(1)

NS_MAP = {
    'http://uri.etsi.org/03120/common/2019/10/Core' : 'core.json',
    'http://uri.etsi.org/03120/common/2016/02/Common' : 'common.json',
    'http://uri.etsi.org/03280/common/2017/07' : 'etsi103280.json',
    'http://www.w3.org/2000/09/xmldsig#' : 'xmldsig',
}

def get_ref_for(xsd_type: XsdType, current_ns : str):
    ns = extract_namespace(xsd_type.name)
    if ns == current_ns:
        return { "$ref" : f"#/$defs/{xsd_type.local_name}" }
    else:
        mapped_id = NS_MAP[ns]
        return { "$ref" : f"{mapped_id}#/$defs/{xsd_type.local_name}"}
 No newline at end of file