translate_spec.py 4.22 KB
Newer Older
canterburym's avatar
canterburym committed
import json
import os
import logging
from pathlib import Path
import sys

from xmlschema import *

from translate import *

logging.basicConfig(level = logging.INFO)

json_signature_struct = {
    "properties" : {    
        "protected" : { "type" : "string" },
        "signature" : { "type" : "string" }
    },
    "required" : ["protected", "signature" ]
}

canterburym's avatar
canterburym committed
def build_schema_locations (paths):
    schema_locations = []
    for schemaFile in paths:
        try:
            xs = XMLSchema(schemaFile, validation='skip')
            schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
            logging.debug (" [ {0}  ->  {1} ]".format(xs.target_namespace, schemaFile))
        except XMLSchemaParseError as ex:
            logging.debug (" [ {0} failed to parse:  {1} ]".format(schemaFile, ex))
    return schema_locations

def get_json(filename):
    with open(filename) as f:
        j = json.load(f)
    return j

def convert_ns_to_id (ns):
    if ns.startswith('http://uri.etsi.org'):
        c = ns.split("/")
        return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}"
    else:
        return ns.replace("http://","").replace("/","_")

def convert_xsd_to_filename (xsd):
    f = Path(xsd)
    return f.name.replace('.xsd', '.schema.json')

if __name__ == "__main__":
    if len(sys.argv) < 2:
        logging.error ("Usage: translate_spec.py path_to_config_file")
        exit(-1)

    config = get_json(sys.argv[1])


    logging.info("Bulding ns map...")
    ns_map = {}
    for location, settings in config['schemas'].items():
        xs = XMLSchema(location, validation='skip')
        ns = xs.target_namespace
        id = convert_ns_to_id(ns)
        ns_map[ns] = {
            "id" : id,
            "location" : str(Path(location).resolve())
        } | settings
    logging.debug(ns_map)
    
    logging.info("Building schema locations")
    schema_locations = [(k, v["location"]) for k,v in ns_map.items()]
canterburym's avatar
canterburym committed
    logging.debug(schema_locations)

    output_path = Path(config['output'])
    if not output_path.exists():
        logging.info("Creating output directory")
        os.mkdir(str(output_path))

    logging.info("Translating schemas...")
    json_schemas = {}
    for schema_tuple in schema_locations:
        logging.info(f"  Translating {schema_tuple}")
        if 'skip' in ns_map[schema_tuple[0]]:
            logging.info(f"  Skipping {schema_tuple[0]}...")
canterburym's avatar
canterburym committed
            continue
        js = translate_schema(schema_tuple[1], ns_map, schema_locations)

        # TODO - Special case, get rid of XML Dsig signature and insert JSON signature
        if schema_tuple[0] == 'http://uri.etsi.org/03120/common/2019/10/Core':
            logging.info ("Modifying signature elements")
            js['$defs']['HI1Message']['properties'].pop('xmldsig:Signature')
            js['$defs']['HI1Message']['properties']['Signature'] = json_signature_struct

canterburym's avatar
canterburym committed
        js_path = output_path / convert_xsd_to_filename(schema_tuple[1])

        # TODO - Special case - abstract HI1Object
canterburym's avatar
canterburym committed
        if "Core" in schema_tuple[1]:
            js["$defs"]['ConcreteHI1Object'] = {
                'oneOf' : [
                    {'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'},
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'},
canterburym's avatar
canterburym committed
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LPTaskObject'},
canterburym's avatar
canterburym committed
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'},
                    {'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'},
                    {'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'},
                    {'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'},
                    {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'},
canterburym's avatar
canterburym committed
                    {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject'},
canterburym's avatar
canterburym committed
                ]
            }

        json_string = json.dumps(js, indent=2)

        if "Core" in schema_tuple[1]:
            json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"')

        with open(str(js_path), 'w', newline='\n') as f:
canterburym's avatar
canterburym committed
            f.write(json_string)
        json_schemas[js['$id']] = json.loads(json_string)