import json import logging from pathlib import Path from xmlschema.etree import etree_tostring from xmlschema import XMLSchema, XMLSchemaParseError def BuildSchemaDictonary (fileList): if len(fileList) == 0: logging.info("No schema files provided") return [] logging.info("Schema locations:") schemaLocations = [] for schemaFile in fileList: try: xs = XMLSchema(schemaFile, validation='skip') schemaLocations.append((xs.default_namespace, str(Path(schemaFile).resolve()))) logging.info(" [ {0} -> {1} ]".format(xs.default_namespace, schemaFile)) except XMLSchemaParseError as ex: logging.warning (" [ {0} failed to parse: {1} ]".format(schemaFile, ex)) return schemaLocations def BuildSchema (coreFile, fileList = None): schemaLocations = [] if fileList and len(fileList) > 0: schemaLocations = BuildSchemaDictonary(fileList) coreSchema = XMLSchema(str(Path(coreFile)), locations=schemaLocations) return coreSchema def ValidateXSDFiles (fileList): if len(fileList) == 0: logging.info("No schema files provided") return {} schemaLocations = BuildSchemaDictonary(fileList) errors = {} logging.info("Schema validation:") for schemaFile in fileList: try: schema = XMLSchema(schemaFile, locations = schemaLocations, validation="lax") logging.info(schemaFile + ": OK") errors[schemaFile] = [f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" for e in schema.all_errors] except XMLSchemaParseError as ex: logging.warning(schemaFile + ": Failed validation ({0})".format(ex.message)) if (ex.schema_url) and (ex.schema_url != ex.origin_url): logging.warning(" Error comes from {0}, suppressing".format(ex.schema_url)) errors[schemaFile] = [] else: errors[schemaFile] = [ex] return errors def ValidateAllXSDFilesInPath (path): schemaGlob = [str(f) for f in Path(path).rglob("*.xsd")] return ValidateXSDFiles(schemaGlob) def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs): if (instanceDocs is None) or len(instanceDocs) == 0: logging.warning ("No instance documents provided") return [] schema = BuildSchema(coreFile, supportingSchemas) errors = [] for instanceDoc in instanceDocs: try: schema.validate(instanceDoc) logging.info ("{0} passed validation".format(instanceDoc)) except Exception as ex: logging.error ("{0} failed validation: {1}".format(instanceDoc, ex)) return errors def processResults (results, stageName): """ Counts the number of errors and writes out the output per filename :param results: List of filenames (str or Pathlib Path) :param stageName: Name to decorate the output with :returns: The number of files which had errors """ print("") errorCount = sum([1 for r in results.values() if not r['ok']]) logging.info(f"{errorCount} {stageName} errors encountered") print(f"{'-':-<60}") print(f"{stageName} results:") print(f"{'-':-<60}") for filename, result in results.items(): print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}") if not result['ok']: if isinstance(result['message'], list): for thing in result['message']: print(f" {thing['message']}") else: print(f" {result['message']}") print(f"{'-':-<60}") print(f"{stageName} errors: {errorCount}") print(f"{'-':-<60}") return errorCount def syntaxCheckXSD (fileList): results = {} for file in fileList: try: logging.info(f"Syntax checking {str(file)}") schema = XMLSchema(str(file), validation="skip") results[str(file)] = { 'ok' : len(schema.all_errors) == 0, 'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}"} for e in schema.all_errors] } except XMLSchemaParseError as ex: logging.warning(str(file) + ": Failed validation ({0})".format(ex.message)) results[str(file)] = { 'ok' : False, 'message' : f"{ex!r}" } return results if __name__ == '__main__': #logging.basicConfig(level=logging.DEBUG) compileTargets = json.loads(Path('testing/xsd_compile_targets.json').read_text()) results = {} for target in compileTargets: coreFile = target['coreSchema'] logging.info(f"Attempting to compile {coreFile}") schemaLocations = [] for supportSchema in target['supportingSchemas']: logging.debug(f"Adding supporting schema {supportSchema}") try: xs = XMLSchema(supportSchema, validation='skip') schemaLocations.append((xs.target_namespace, str(Path(supportSchema).resolve()))) logging.info(" [ {0} -> {1} ]".format(xs.default_namespace, supportSchema)) except Exception as ex: logging.warning (" [ {0} exception parsing: {1} ]".format(supportSchema, ex)) results[coreFile] = { 'ok' : False, 'message' : f"{ex!r}" } break try: schema = XMLSchema(coreFile, locations = schemaLocations, validation="strict") results[coreFile] = { 'ok' : len(schema.all_errors) == 0, 'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}"} for e in schema.all_errors] } target["schemaInstance"] = schema except Exception as ex: results[coreFile] = { 'ok' : False, 'message' : f"{ex!r}" } continue if (processResults(results, "Compile") > 0): exit(-1) results = {} for target in compileTargets: schema = target["schemaInstance"] testResults = {} failureCount = 0 logging.info (f"Validating example {len(target['exampleFiles'])} entries for {target['coreSchema']}") for example in target["exampleFiles"]: examplePath = Path(example) if examplePath.is_dir(): logging.debug (f"Expanding {str(examplePath)}") testFiles = list(examplePath.rglob("./*.xml")) else: testFiles = [examplePath] logging.debug(f"Found {len(testFiles)} test files") for test in testFiles: logging.debug(f"Validating {str(test)} against schema") try: errors = list(schema.iter_errors(str(test))) testResults[test] = [f"{etree_tostring(e.elem, e.namespaces, ' ', 20)} - {e.message}" for e in errors] failureCount += len(errors) except Exception as ex: testResults[test] = [f"{ex!r}"] failureCount += 1 results[target['coreSchema']] = { 'ok' : failureCount == 0, 'testResults' : testResults, 'failureCount' : failureCount } print(f"{'-':-<75}") print(f"Validation results:") print(f"{'-':-<75}") totalErrors = 0 for filename, result in results.items(): if len(result['testResults']) == 0: print (f"{filename:.<70}SKIP (0)") continue else: print (f"{filename:.<70}{'..OK' if result['ok'] else 'FAIL'} ({len(result['testResults'])})") totalErrors += result['failureCount'] if result['failureCount'] > 0: for testFile, testResult in result['testResults'].items(): print(f" {str(testFile):.<65}{'..OK' if len(testResult) == 0 else 'FAIL'}") for tr in testResult: print(f" {tr}") print(f"{'-':-<75}") print(f"Validation errors: {totalErrors}") print(f"{'-':-<75}") exit(totalErrors > 0)