Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import json
import os
import logging
from pathlib import Path
import sys
from xmlschema import *
from translate import *
logging.basicConfig(level = logging.INFO)
def build_schema_locations (paths):
schema_locations = []
for schemaFile in paths:
try:
xs = XMLSchema(schemaFile, validation='skip')
schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
logging.debug (" [ {0} -> {1} ]".format(xs.target_namespace, schemaFile))
except XMLSchemaParseError as ex:
logging.debug (" [ {0} failed to parse: {1} ]".format(schemaFile, ex))
return schema_locations
def get_json(filename):
with open(filename) as f:
j = json.load(f)
return j
def convert_ns_to_id (ns):
if ns.startswith('http://uri.etsi.org'):
c = ns.split("/")
return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}"
else:
return ns.replace("http://","").replace("/","_")
def convert_xsd_to_filename (xsd):
f = Path(xsd)
return f.name.replace('.xsd', '.schema.json')
if __name__ == "__main__":
if len(sys.argv) < 2:
logging.error ("Usage: translate_spec.py path_to_config_file")
exit(-1)
config = get_json(sys.argv[1])
logging.info("Bulding ns map...")
ns_map = {}
for location, settings in config['schemas'].items():
xs = XMLSchema(location, validation='skip')
ns = xs.target_namespace
id = convert_ns_to_id(ns)
ns_map[ns] = {
"id" : id,
"location" : str(Path(location).resolve())
} | settings
logging.debug(ns_map)
logging.info("Building schema locations")
schema_locations = [(k, v["location"]) for k,v in ns_map.items()]
logging.debug(schema_locations)
output_path = Path(config['output'])
if not output_path.exists():
logging.info("Creating output directory")
os.mkdir(str(output_path))
logging.info("Translating schemas...")
json_schemas = {}
for schema_tuple in schema_locations:
logging.info(f" Translating {schema_tuple}")
if ns_map[schema_tuple[0]].get('skip'):
logging.info(f" Skipping {schema_tuple[0]} due to config...")
js = translate_schema(schema_tuple[1], ns_map, schema_locations)
# TODO - Special case, get rid of signature
if ns_map[schema_tuple[0]] == 'core.json':
js['$defs']['HI1Message']['properties'].pop('Signature')
js_path = output_path / convert_xsd_to_filename(schema_tuple[1])
# TODO - Special case - abstract HI1Object
if "Core" in schema_tuple[1]:
js["$defs"]['ConcreteHI1Object'] = {
'oneOf' : [
{'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LPTaskObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'},
{'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'},
{'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'},
{'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'},
{'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'},
{'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject'},
]
}
json_string = json.dumps(js, indent=2)
if "Core" in schema_tuple[1]:
json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"')
with open(str(js_path), 'w') as f:
f.write(json_string)
json_schemas[js['$id']] = json.loads(json_string)