Skip to content
translate_spec.py 4.43 KiB
Newer Older
canterburym's avatar
canterburym committed
import json
import os
import logging
from pathlib import Path
import sys

from xmlschema import *

from translate import *

import jsonschema

logging.basicConfig(level = logging.INFO)

def build_schema_locations (paths):
    schema_locations = []
    for schemaFile in paths:
        try:
            xs = XMLSchema(schemaFile, validation='skip')
            schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
            logging.debug (" [ {0}  ->  {1} ]".format(xs.target_namespace, schemaFile))
        except XMLSchemaParseError as ex:
            logging.debug (" [ {0} failed to parse:  {1} ]".format(schemaFile, ex))
    return schema_locations

def get_json(filename):
    with open(filename) as f:
        j = json.load(f)
    return j

def convert_ns_to_id (ns):
    if ns.startswith('http://uri.etsi.org'):
        c = ns.split("/")
        return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}"
    else:
        return ns.replace("http://","").replace("/","_")

def convert_xsd_to_filename (xsd):
    f = Path(xsd)
    return f.name.replace('.xsd', '.schema.json')

if __name__ == "__main__":
    if len(sys.argv) < 2:
        logging.error ("Usage: translate_spec.py path_to_config_file")
        exit(-1)

    config = get_json(sys.argv[1])

    schema_paths = config['schemas']
    logging.info("Bulding schema locations...")
    schema_locations = build_schema_locations(schema_paths)
    logging.debug(schema_locations)
        
    # ns_to_id_map = { 
    #     schema[0]: schema[0].split('/')[-1].lower() + ".json" 
    #     for schema in schema_locations if '03120' in schema[0] 
    # } | {
    #     'http://uri.etsi.org/03280/common/2017/07' : 'etsi103280.json',
    #     'http://www.w3.org/2000/09/xmldsig#' : 'xmldsig.json',
    # }
    logging.info("Constructing XSD NS to JSON Schema ID mapping...")
    ns_to_id_map = { schema[0] : convert_ns_to_id(schema[0]) for schema in schema_locations}
    logging.debug(ns_to_id_map)

    #     # js = translate_schema("103280/TS_103_280.xsd", "103120.json")
    #     # print(json.dumps(js, indent=2))

    output_path = Path(config['output'])
    if not output_path.exists():
        logging.info("Creating output directory")
        os.mkdir(str(output_path))

    logging.info("Translating schemas...")
    json_schemas = {}
    for schema_tuple in schema_locations:
        logging.info(f"  Translating {schema_tuple}")
        if 'xmldsig' in (schema_tuple[1]):
            # TODO - work out what to do here
            logging.info("  Skipping XML Dsig...")
            continue
        js = translate_schema(schema_tuple[1], ns_to_id_map, schema_locations)
        if ns_to_id_map[schema_tuple[0]] == 'core.json':
            js['$defs']['HI1Message']['properties'].pop('Signature')
        js_path = output_path / convert_xsd_to_filename(schema_tuple[1])

        # TODO - work out how to do this substitution automatically
        # and build the graph of acceptable descendent types
        if "Core" in schema_tuple[1]:
            js["$defs"]['ConcreteHI1Object'] = {
                'oneOf' : [
                    {'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'},
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'},
                    {'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'},
                    {'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'},
                    {'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'},
                    {'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'},
                    {'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'},
                ]
            }

        json_string = json.dumps(js, indent=2)

        if "Core" in schema_tuple[1]:
            json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"')

        with open(str(js_path), 'w') as f:
            f.write(json_string)
        json_schemas[js['$id']] = json.loads(json_string)

    # else:
    #     json_schemas = {}
    #     json_path = Path('json/')
    #     for json_file in json_path.glob("*.json"):
    #         json_schemas[json_file.name] = get_json(json_file)
    
    # resolver = jsonschema.RefResolver("", "", json_schemas)

    # instance = get_json("120.json")
    # schema = json_schemas['core.json']
    # jsonschema.validate(instance, schema, resolver=resolver)
    
    # print(json.dumps(js, indent=2))