Newer
Older
import json
import os
import logging
from pathlib import Path
import sys
from xmlschema import *
from translate import *
logging.basicConfig(level = logging.INFO)
json_signature_struct = {
"properties" : {
"protected" : { "type" : "string" },
"signature" : { "type" : "string" }
},
"required" : ["protected", "signature" ]
}
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def build_schema_locations (paths):
schema_locations = []
for schemaFile in paths:
try:
xs = XMLSchema(schemaFile, validation='skip')
schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
logging.debug (" [ {0} -> {1} ]".format(xs.target_namespace, schemaFile))
except XMLSchemaParseError as ex:
logging.debug (" [ {0} failed to parse: {1} ]".format(schemaFile, ex))
return schema_locations
def get_json(filename):
with open(filename) as f:
j = json.load(f)
return j
def convert_ns_to_id (ns):
if ns.startswith('http://uri.etsi.org'):
c = ns.split("/")
return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}"
else:
return ns.replace("http://","").replace("/","_")
def convert_xsd_to_filename (xsd):
f = Path(xsd)
return f.name.replace('.xsd', '.schema.json')
if __name__ == "__main__":
if len(sys.argv) < 2:
logging.error ("Usage: translate_spec.py path_to_config_file")
exit(-1)
config = get_json(sys.argv[1])
logging.info("Bulding ns map...")
ns_map = {}
for location, settings in config['schemas'].items():
xs = XMLSchema(location, validation='skip')
ns = xs.target_namespace
id = convert_ns_to_id(ns)
ns_map[ns] = {
"id" : id,
"location" : str(Path(location).resolve())
} | settings
logging.debug(ns_map)
logging.info("Building schema locations")
schema_locations = [(k, v["location"]) for k,v in ns_map.items()]
logging.debug(schema_locations)
output_path = Path(config['output'])
if not output_path.exists():
logging.info("Creating output directory")
os.mkdir(str(output_path))
logging.info("Translating schemas...")
json_schemas = {}
for schema_tuple in schema_locations:
logging.info(f" Translating {schema_tuple}")
if 'skip' in ns_map[schema_tuple[0]]:
logging.info(f" Skipping {schema_tuple[0]}...")
js = translate_schema(schema_tuple[1], ns_map, schema_locations)
# TODO - Special case, get rid of XML Dsig signature and insert JSON signature
if schema_tuple[0] == 'http://uri.etsi.org/03120/common/2019/10/Core':
logging.info ("Modifying signature elements")
js['$defs']['HI1Message']['properties'].pop('xmldsig:Signature')
js['$defs']['HI1Message']['properties']['Signature'] = json_signature_struct
js_path = output_path / convert_xsd_to_filename(schema_tuple[1])
# TODO - Special case - abstract HI1Object
if "Core" in schema_tuple[1]:
js["$defs"]['ConcreteHI1Object'] = {
'oneOf' : [
{'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LPTaskObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'},
{'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'},
{'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'},
{'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'},
{'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'},
{'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficRuleObject'},
]
}
json_string = json.dumps(js, indent=2)
if "Core" in schema_tuple[1]:
json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"')
with open(str(js_path), 'w') as f:
f.write(json_string)
json_schemas[js['$id']] = json.loads(json_string)