Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
import os
import logging
from pathlib import Path
import sys
from xmlschema import *
from translate import *
import jsonschema
logging.basicConfig(level = logging.INFO)
def build_schema_locations (paths):
schema_locations = []
for schemaFile in paths:
try:
xs = XMLSchema(schemaFile, validation='skip')
schema_locations.append((xs.target_namespace, str(Path(schemaFile).resolve())))
logging.debug (" [ {0} -> {1} ]".format(xs.target_namespace, schemaFile))
except XMLSchemaParseError as ex:
logging.debug (" [ {0} failed to parse: {1} ]".format(schemaFile, ex))
return schema_locations
def get_json(filename):
with open(filename) as f:
j = json.load(f)
return j
def convert_ns_to_id (ns):
if ns.startswith('http://uri.etsi.org'):
c = ns.split("/")
return f"ts_1{c[3]}{'_' + c[7] if len(c) > 7 else ''}_{c[5]}_{c[6]}"
else:
return ns.replace("http://","").replace("/","_")
def convert_xsd_to_filename (xsd):
f = Path(xsd)
return f.name.replace('.xsd', '.schema.json')
if __name__ == "__main__":
if len(sys.argv) < 2:
logging.error ("Usage: translate_spec.py path_to_config_file")
exit(-1)
config = get_json(sys.argv[1])
schema_paths = config['schemas']
logging.info("Bulding schema locations...")
schema_locations = build_schema_locations(schema_paths)
logging.debug(schema_locations)
# ns_to_id_map = {
# schema[0]: schema[0].split('/')[-1].lower() + ".json"
# for schema in schema_locations if '03120' in schema[0]
# } | {
# 'http://uri.etsi.org/03280/common/2017/07' : 'etsi103280.json',
# 'http://www.w3.org/2000/09/xmldsig#' : 'xmldsig.json',
# }
logging.info("Constructing XSD NS to JSON Schema ID mapping...")
ns_to_id_map = { schema[0] : convert_ns_to_id(schema[0]) for schema in schema_locations}
logging.debug(ns_to_id_map)
# # js = translate_schema("103280/TS_103_280.xsd", "103120.json")
# # print(json.dumps(js, indent=2))
output_path = Path(config['output'])
if not output_path.exists():
logging.info("Creating output directory")
os.mkdir(str(output_path))
logging.info("Translating schemas...")
json_schemas = {}
for schema_tuple in schema_locations:
logging.info(f" Translating {schema_tuple}")
if 'xmldsig' in (schema_tuple[1]):
# TODO - work out what to do here
logging.info(" Skipping XML Dsig...")
continue
js = translate_schema(schema_tuple[1], ns_to_id_map, schema_locations)
if ns_to_id_map[schema_tuple[0]] == 'core.json':
js['$defs']['HI1Message']['properties'].pop('Signature')
js_path = output_path / convert_xsd_to_filename(schema_tuple[1])
# TODO - work out how to do this substitution automatically
# and build the graph of acceptable descendent types
if "Core" in schema_tuple[1]:
js["$defs"]['ConcreteHI1Object'] = {
'oneOf' : [
{'$ref' : 'ts_103120_Authorisation_2020_09#/$defs/AuthorisationObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LITaskObject'},
{'$ref' : 'ts_103120_Task_2020_09#/$defs/LDTaskObject'},
{'$ref' : 'ts_103120_Document_2020_09#/$defs/DocumentObject'},
{'$ref' : 'ts_103120_Notification_2016_02#/$defs/NotificationObject'},
{'$ref' : 'ts_103120_Delivery_2019_10#/$defs/DeliveryObject'},
{'$ref' : 'ts_103120_TrafficPolicy_2022_07#/$defs/TrafficPolicyObject'},
]
}
json_string = json.dumps(js, indent=2)
if "Core" in schema_tuple[1]:
json_string = json_string.replace('"$ref": "#/$defs/HI1Object"', '"$ref": "#/$defs/ConcreteHI1Object"')
with open(str(js_path), 'w') as f:
f.write(json_string)
json_schemas[js['$id']] = json.loads(json_string)
# else:
# json_schemas = {}
# json_path = Path('json/')
# for json_file in json_path.glob("*.json"):
# json_schemas[json_file.name] = get_json(json_file)
# resolver = jsonschema.RefResolver("", "", json_schemas)
# instance = get_json("120.json")
# schema = json_schemas['core.json']
# jsonschema.validate(instance, schema, resolver=resolver)
# print(json.dumps(js, indent=2))