xsd_process.py 7.98 KB
Newer Older
canterburym's avatar
canterburym committed
import json
import logging
from pathlib import Path

from xmlschema.etree import etree_tostring
from xmlschema import XMLSchema, XMLSchemaParseError


def BuildSchemaDictonary (fileList):
    if len(fileList) == 0:
        logging.info("No schema files provided")
        return []

    logging.info("Schema locations:")
    schemaLocations = []
    for schemaFile in fileList:
        try:
            xs = XMLSchema(schemaFile, validation='skip')
            schemaLocations.append((xs.default_namespace, str(Path(schemaFile).resolve())))
            logging.info(" [ {0}  ->  {1} ]".format(xs.default_namespace, schemaFile))
        except XMLSchemaParseError as ex:
            logging.warning (" [ {0} failed to parse:  {1} ]".format(schemaFile, ex))
    return schemaLocations


def BuildSchema (coreFile, fileList = None):
    schemaLocations = []
    if fileList and len(fileList) > 0:
        schemaLocations = BuildSchemaDictonary(fileList)

    coreSchema = XMLSchema(str(Path(coreFile)), locations=schemaLocations)
    return coreSchema


def ValidateXSDFiles (fileList):
    if len(fileList) == 0:
        logging.info("No schema files provided")
        return {}
    
    schemaLocations = BuildSchemaDictonary(fileList)
    errors = {}

    logging.info("Schema validation:")
    for schemaFile in fileList:
        try:
            schema = XMLSchema(schemaFile, locations = schemaLocations, validation="lax")
            logging.info(schemaFile + ": OK")
            errors[schemaFile] = [f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}" for e in schema.all_errors]
        except XMLSchemaParseError as ex:
            logging.warning(schemaFile + ": Failed validation ({0})".format(ex.message))
            if (ex.schema_url) and (ex.schema_url != ex.origin_url):
                logging.warning("  Error comes from {0}, suppressing".format(ex.schema_url))
                errors[schemaFile] = []                
            else:
                errors[schemaFile] = [ex]
    return errors


def ValidateAllXSDFilesInPath (path):
    schemaGlob = [str(f) for f in Path(path).rglob("*.xsd")]
    return ValidateXSDFiles(schemaGlob)


def ValidateInstanceDocuments (coreFile, supportingSchemas, instanceDocs):
    if (instanceDocs is None) or len(instanceDocs) == 0:
        logging.warning ("No instance documents provided")
        return []

    schema = BuildSchema(coreFile, supportingSchemas)
    errors = []
    for instanceDoc in instanceDocs:
        try:
            schema.validate(instanceDoc)
            logging.info ("{0} passed validation".format(instanceDoc))
        except Exception as ex:
            logging.error ("{0} failed validation: {1}".format(instanceDoc, ex))
    return errors


def processResults (results, stageName):
    """
    Counts the number of errors and writes out the output per filename

    :param results: List of filenames (str or Pathlib Path)
    :param stageName: Name to decorate the output with
    :returns: The number of files which had errors
    """    
    print("")
    errorCount = sum([1 for r in results.values() if not r['ok']])
    logging.info(f"{errorCount} {stageName} errors encountered")
    
    print(f"{'-':-<60}")
    print(f"{stageName} results:")
    print(f"{'-':-<60}")
    for filename, result in results.items():
        print(f" {filename:.<55}{'..OK' if result['ok'] else 'FAIL'}")
        if not result['ok']:
            if isinstance(result['message'], list):
                for thing in result['message']:
                    print(f"    {thing['message']}")
            else:
                print(f"    {result['message']}")
    
    print(f"{'-':-<60}")
    print(f"{stageName} errors: {errorCount}")
    print(f"{'-':-<60}")
 
    return errorCount


canterburym's avatar
canterburym committed
def syntaxCheckXSD (fileList):
    results = {}
    for file in fileList:
canterburym's avatar
canterburym committed
        try:
            logging.info(f"Syntax checking {str(file)}")
canterburym's avatar
canterburym committed
            schema = XMLSchema(str(file), validation="skip")
            results[str(file)] = {
                'ok' : len(schema.all_errors) == 0,
                'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}"} for e in schema.all_errors]
            }
        except XMLSchemaParseError as ex:
            logging.warning(str(file) + ": Failed validation ({0})".format(ex.message))
            results[str(file)] = {
                'ok' : False,
                'message' : f"{ex!r}"
            }
    return results
canterburym's avatar
canterburym committed
if __name__ == '__main__':
    #logging.basicConfig(level=logging.DEBUG)

    compileTargets = json.loads(Path('testing/xsd_compile_targets.json').read_text())
    results = {}
    for target in compileTargets:
        coreFile = target['coreSchema']
        logging.info(f"Attempting to compile {coreFile}")
        schemaLocations = []
        for supportSchema in target['supportingSchemas']:
            logging.debug(f"Adding supporting schema {supportSchema}")
            try:
                xs = XMLSchema(supportSchema, validation='skip')
                schemaLocations.append((xs.default_namespace, str(Path(supportSchema).resolve())))
                logging.info(" [ {0}  ->  {1} ]".format(xs.default_namespace, supportSchema))
            except Exception as ex:
                logging.warning (" [ {0} exception parsing:  {1} ]".format(supportSchema, ex))
                results[coreFile] = {
                    'ok' : False,
                    'message' : f"{ex!r}"
                }
                break
        try:
            schema = XMLSchema(coreFile, locations = schemaLocations, validation="strict")
            results[coreFile] = {
                'ok' : len(schema.all_errors) == 0,
                'message' : None if len(schema.all_errors) == 0 else [{'message' : f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}"} for e in schema.all_errors]
            }
            target["schemaInstance"] = schema
        except Exception as ex:
            results[coreFile] = {
                'ok' : False,
                'message' : f"{ex!r}"
            }
            continue
    
    if (processResults(results, "Compile") > 0):
        exit(-1)
canterburym's avatar
canterburym committed
    
    results = {}

    for target in compileTargets:
        schema = target["schemaInstance"]
        testResults = {}
        failureCount = 0
        logging.info (f"Validating example {len(target['exampleFiles'])} entries for {target['coreSchema']}")
        for example in target["exampleFiles"]:
            examplePath = Path(example)
            if examplePath.is_dir:
                logging.debug (f"Expanding {str(examplePath)}")
                testFiles = list(examplePath.rglob("./*.xml"))
            else:
                testFiles = [examplePath]
            logging.debug(f"Found {len(testFiles)} test files")
            for test in testFiles:
                logging.debug(f"Validating {str(test)} against schema")
                try:
                    errors = list(schema.iter_errors(str(test)))
                    testResults[test] = [f"{etree_tostring(e.elem, e.namespaces, '  ', 20)} - {e.message}" for e in errors]
                    failureCount += len(errors)
                except Exception as ex:
                    testResults[test] = [f"{ex!r}"]                        
                    failureCount += 1
        results[target['coreSchema']] = {
            'ok' : failureCount == 0,
            'testResults' : testResults,
            'failureCount' : failureCount
        }
    
    print(f"{'-':-<75}")
    print(f"Validation results:")
    print(f"{'-':-<75}")

    totalErrors = 0
    for filename, result in results.items():
        print (f"{filename:.<70}{'..OK' if result['ok'] else 'FAIL'}")
        totalErrors += result['failureCount']
        for testFile, testResult in result['testResults'].items():
            print(f"  {str(testFile):.<65}{'..OK' if len(testResult) == 0 else 'FAIL'}")
            for tr in testResult:
                print(f"    {tr}")

    print(f"{'-':-<75}")
    print(f"Validation errors: {totalErrors}")
    print(f"{'-':-<75}")

    exit(totalErrors > 0)