Skip to content
translate_spec.py 3.83 KiB
Newer Older
canterburym's avatar
canterburym committed
import json
import os
import logging
from pathlib import Path
import sys

from xmlschema import *

from translate import *

logging.basicConfig(level = logging.INFO)

def build_schema_locations (paths):
    schema_locations = []
    for schemaFile in paths:
        try:
            xs = XMLSchema(schemaFile, validation='skip')
            schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
            logging.debug (" [ {0}  ->  {1} ]".format(xs.target_namespace, schemaFile))
        except XMLSchemaParseError as ex:
            logging.debug (" [ {0} failed to parse:  {1} ]".format(schemaFile, ex))
    return schema_locations

def get_json(filename):
    with open(filename) as f:
        j = json.load(f)
    return j

def convert_ns_to_id (ns):
    if ns.startswith('http://uri.etsi.org'):
        c = ns.split("/")
        return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}"
    else:
        return ns.replace("http://","").replace("/","_")

def convert_xsd_to_filename (xsd):
    f = Path(xsd)
    return f.name.replace('.xsd', '.schema.json')

if __name__ == "__main__":
    if len(sys.argv) < 2:
        logging.error ("Usage: translate_spec.py path_to_config_file")
        exit(-1)

    config = get_json(sys.argv[1])


    logging.info("Bulding ns map...")
    ns_map = {}
    for location, settings in config['schemas'].items():
        xs = XMLSchema(location, validation='skip')
        ns = xs.target_namespace
        id = convert_ns_to_id(ns)
        ns_map[ns] = {
            "id" : id,
            "location" : str(Path(location).resolve())
        } | settings
    logging.debug(ns_map)
    
    logging.info("Building schema locations")
    schema_locations = [(k, v["location"]) for k,v in ns_map.items()]
canterburym's avatar
canterburym committed
    logging.debug(schema_locations)

    output_path = Path(config['output'])
    if not output_path.exists():
        logging.info("Creating output directory")
        os.mkdir(str(output_path))

    logging.info("Translating schemas...")
    json_schemas = {}
    for schema_tuple in schema_locations:
        logging.info(f"  Translating {schema_tuple}")
        if 'xmldsig' in (schema_tuple[1]):
            # TODO - work out what to do here
            logging.info("  Skipping XML Dsig...")
            continue
        js = translate_schema(schema_tuple[1], ns_map, schema_locations)

        # TODO - Special case, get rid of signature
        if ns_map[schema_tuple[0]] == 'core.json':
canterburym's avatar
canterburym committed
            js['$defs']['HI1Message']['properties'].pop('Signature')
        js_path = output_path / convert_xsd_to_filename(schema_tuple[1])

        # TODO - Special case - abstract HI1Object
canterburym's avatar
canterburym committed
        if "Core" in schema_tuple[1]:
            js["$defs"]['ConcreteHI1Object'] = {
                'oneOf' : [
                    {'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'},
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'},
canterburym's avatar
canterburym committed
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LPTaskObject'},
canterburym's avatar
canterburym committed
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'},
                    {'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'},
                    {'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'},
                    {'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'},
                    {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'},
canterburym's avatar
canterburym committed
                    {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject'},
canterburym's avatar
canterburym committed
                ]
            }

        json_string = json.dumps(js, indent=2)

        if "Core" in schema_tuple[1]:
            json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"')

        with open(str(js_path), 'w') as f:
            f.write(json_string)
        json_schemas[js['$id']] = json.loads(json_string)