xsd_process.py 8.21 KB
Newer Older
canterburym's avatar
canterburym committed
1
import json
canterburym's avatar
canterburym committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import logging
from pathlib import Path

from xmlschema.etree import etree_tostring
from xmlschema import XMLSchema, XMLSchemaParseError


def BuildSchemaDictonary (fileList):
    if len(fileList) == 0:
        logging.info("No schema files provided")
        return []

    logging.info("Schema locations:")
    schemaLocations = []
    for schemaFile in fileList:
        try:
            xs = XMLSchema(schemaFile, validation='skip')
            schemaLocations.append((xs.default_namespace, str(Path(schemaFile).resolve())))
            logging.info(" [ {0}  ->  {1} ]".format(xs.default_namespace, schemaFile))
        except XMLSchemaParseError as ex:
            logging.warning (" [ {0} failed to parse:  {1} ]".format(schemaFile, ex))
    return schemaLocations


def BuildSchema (coreFile, fileList = None):
    schemaLocations = []
    if fileList and len(fileList) > 0:
        schemaLocations = BuildSchemaDictonary(fileList)

    coreSchema = XMLSchema(str(Path(coreFile)), locations=schemaLocations)
    return coreSchema


def ValidateXSDFiles (fileList):
    if len(fileList) == 0:
        logging.info("No schema files provided")
        return {}
    
    schemaLocations = BuildSchemaDictonary(fileList)
    errors = {}

    logging.info("Schema validation:")
    for schemaFile in fileList:
        try:
            schema = XMLSchema(schemaFile, locations = schemaLocations, validation="lax")
            logging.info(schemaFile + ": OK")
            errors[schemaFile] = [f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}" for e in schema.all_errors]
        except XMLSchemaParseError as ex:
            logging.warning(schemaFile + ": Failed validation ({0})".format(ex.message))
            if (ex.schema_url) and (ex.schema_url != ex.origin_url):
                logging.warning("  Error comes from {0}, suppressing".format(ex.schema_url))
                errors[schemaFile] = []                
            else:
                errors[schemaFile] = [ex]
    return errors


def ValidateAllXSDFilesInPath (path):
    schemaGlob = [str(f) for f in Path(path).rglob("*.xsd")]
    return ValidateXSDFiles(schemaGlob)


def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs):
    if (instanceDocs is None) or len(instanceDocs) == 0:
        logging.warning ("No instance documents provided")
        return []

    schema = BuildSchema(coreFile, supportingSchemas)
    errors = []
    for instanceDoc in instanceDocs:
        try:
            schema.validate(instanceDoc)
            logging.info ("{0} passed validation".format(instanceDoc))
        except Exception as ex:
            logging.error ("{0} failed validation: {1}".format(instanceDoc, ex))
    return errors


def processResults (results, stageName):
    """
    Counts the number of errors and writes out the output per filename

    :param results: List of filenames (str or Pathlib Path)
    :param stageName: Name to decorate the output with
    :returns: The number of files which had errors
    """    
    print("")
    errorCount = sum([1 for r in results.values() if not r['ok']])
    logging.info(f"{errorCount} {stageName} errors encountered")
    
    print(f"{'-':-<60}")
    print(f"{stageName} results:")
    print(f"{'-':-<60}")
    for filename, result in results.items():
        print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}")
        if not result['ok']:
            if isinstance(result['message'], list):
                for thing in result['message']:
                    print(f"    {thing['message']}")
            else:
                print(f"    {result['message']}")
    
    print(f"{'-':-<60}")
    print(f"{stageName} errors: {errorCount}")
    print(f"{'-':-<60}")
 
    return errorCount


canterburym's avatar
canterburym committed
111
112
def syntaxCheckXSD (fileList):
    results = {}
canterburym's avatar
canterburym committed
113
    for file in fileList:
canterburym's avatar
canterburym committed
114
115
        try:
            logging.info(f"Syntax checking {str(file)}")
canterburym's avatar
canterburym committed
116

canterburym's avatar
canterburym committed
117
118
119
120
121
122
123
124
125
126
127
128
            schema = XMLSchema(str(file), validation="skip")
            results[str(file)] = {
                'ok' : len(schema.all_errors) == 0,
                'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}"} for e in schema.all_errors]
            }
        except XMLSchemaParseError as ex:
            logging.warning(str(file) + ": Failed validation ({0})".format(ex.message))
            results[str(file)] = {
                'ok' : False,
                'message' : f"{ex!r}"
            }
    return results
canterburym's avatar
canterburym committed
129
130


canterburym's avatar
canterburym committed
131
132
133
134
135
136
137
138
139
140
141
142
143
if __name__ == '__main__':
    #logging.basicConfig(level=logging.DEBUG)

    compileTargets = json.loads(Path('testing/xsd_compile_targets.json').read_text())
    results = {}
    for target in compileTargets:
        coreFile = target['coreSchema']
        logging.info(f"Attempting to compile {coreFile}")
        schemaLocations = []
        for supportSchema in target['supportingSchemas']:
            logging.debug(f"Adding supporting schema {supportSchema}")
            try:
                xs = XMLSchema(supportSchema, validation='skip')
canterburym's avatar
canterburym committed
144
                schemaLocations.append((xs.target_namespace, str(Path(supportSchema).resolve())))
canterburym's avatar
canterburym committed
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
                logging.info(" [ {0}  ->  {1} ]".format(xs.default_namespace, supportSchema))
            except Exception as ex:
                logging.warning (" [ {0} exception parsing:  {1} ]".format(supportSchema, ex))
                results[coreFile] = {
                    'ok' : False,
                    'message' : f"{ex!r}"
                }
                break
        try:
            schema = XMLSchema(coreFile, locations = schemaLocations, validation="strict")
            results[coreFile] = {
                'ok' : len(schema.all_errors) == 0,
                'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}"} for e in schema.all_errors]
            }
            target["schemaInstance"] = schema
        except Exception as ex:
            results[coreFile] = {
                'ok' : False,
                'message' : f"{ex!r}"
            }
            continue
    
    if (processResults(results, "Compile") > 0):
canterburym's avatar
canterburym committed
168
        exit(-1)
canterburym's avatar
canterburym committed
169
170
171
172
173
174
175
176
177
178
    
    results = {}

    for target in compileTargets:
        schema = target["schemaInstance"]
        testResults = {}
        failureCount = 0
        logging.info (f"Validating example {len(target['exampleFiles'])} entries for {target['coreSchema']}")
        for example in target["exampleFiles"]:
            examplePath = Path(example)
canterburym's avatar
canterburym committed
179
            if examplePath.is_dir():
canterburym's avatar
canterburym committed
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
                logging.debug (f"Expanding {str(examplePath)}")
                testFiles = list(examplePath.rglob("./*.xml"))
            else:
                testFiles = [examplePath]
            logging.debug(f"Found {len(testFiles)} test files")
            for test in testFiles:
                logging.debug(f"Validating {str(test)} against schema")
                try:
                    errors = list(schema.iter_errors(str(test)))
                    testResults[test] = [f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}" for e in errors]
                    failureCount += len(errors)
                except Exception as ex:
                    testResults[test] = [f"{ex!r}"]                        
                    failureCount += 1
        results[target['coreSchema']] = {
            'ok' : failureCount == 0,
            'testResults' : testResults,
            'failureCount' : failureCount
        }
    
    print(f"{'-':-<75}")
    print(f"Validation results:")
    print(f"{'-':-<75}")

    totalErrors = 0
    for filename, result in results.items():
canterburym's avatar
canterburym committed
206
207
208
209
210
211
212
213
214
215
216
        if len(result['testResults']) == 0:
            print (f"{filename:.<70}SKIP (0)")
            continue
        else:
            print (f"{filename:.<70}{'..OK' if result['ok'] else 'FAIL'} ({len(result['testResults'])})")
            totalErrors += result['failureCount']
            if result['failureCount'] > 0:
                for testFile, testResult in result['testResults'].items():
                    print(f"  {str(testFile):.<65}{'..OK' if len(testResult) == 0 else 'FAIL'}")
                    for tr in testResult:
                        print(f"    {tr}")
canterburym's avatar
canterburym committed
217
218
219
220
221
222

    print(f"{'-':-<75}")
    print(f"Validation errors: {totalErrors}")
    print(f"{'-':-<75}")

    exit(totalErrors > 0)