Commit 4972648e authored by Mark Canterbury's avatar Mark Canterbury
Browse files

Adding default $schema - should be switch

parent 452ad773
Loading
Loading
Loading
Loading
+35 −25
Original line number Diff line number Diff line
@@ -3,28 +3,29 @@ from pathlib import Path
from typing import Sequence, Tuple, Generator, List, Mapping
from json import load, loads, JSONDecodeError

from jsonschema import RefResolver, Draft202012Validator, Validator, RefResolutionError
from referencing import Registry
from jsonschema import Draft202012Validator, Validator #, RefResolver, RefResolutionError

from .types import BuildResult, SchemaValidationResult, InstanceValidationResult

def recursively_find_refs(j) -> Generator[Tuple[str, str], None, None]:
    for k,v in j.items():
        if k == "$ref":
            yield k,v
        if isinstance(v, dict):
            yield from recursively_find_refs(v)
# def recursively_find_refs(j) -> Generator[Tuple[str, str], None, None]:
#     for k,v in j.items():
#         if k == "$ref":
#             yield k,v
#         if isinstance(v, dict):
#             yield from recursively_find_refs(v)

def check_refs(schema: dict, resolver: RefResolver) -> List[RefResolutionError]:
    refs = list(recursively_find_refs(schema))
    errors = []
    for key, ref in refs:
        try:
            if ref.startswith("#"):
                continue
            resolved_ref = resolver.resolve(ref)
        except RefResolutionError as ex:
            errors.append(ex)
    return errors
# def check_refs(schema: dict, resolver: RefResolver) -> List[RefResolutionError]:
#     refs = list(recursively_find_refs(schema))
#     errors = []
#     for key, ref in refs:
#         try:
#             if ref.startswith("#"):
#                 continue
#             resolved_ref = resolver.resolve(ref)
#         except RefResolutionError as ex:
#             errors.append(ex)
#     return errors
        
def build_schema(core_schema_path: str | Path, supporting_schema_paths: Sequence[Path | str] = None) -> Tuple[Draft202012Validator | None, list[BuildResult]]:
    core_schema_path = Path(core_schema_path)
@@ -36,6 +37,8 @@ def build_schema(core_schema_path: str | Path, supporting_schema_paths: Sequence
    try:        
        _core_schema = load(core_schema_path.open())
        _schema_dict = { _core_schema['$id'] : _core_schema }
        if "$schema" not in _core_schema.keys():
            _core_schema["$schema"] = "https://json-schema.org/draft/2020-12/schema"
    except (JSONDecodeError, KeyError) as ex:
        logging.warning(f"Could not parse {core_schema_path} ({ex!r})")
        return None, [BuildResult(core_schema_path, False, [ex])]
@@ -54,6 +57,8 @@ def build_schema(core_schema_path: str | Path, supporting_schema_paths: Sequence
    for path in _supporting_paths:
        try:
            new_schema = load(path.open())
            if "$schema" not in new_schema.keys():
                new_schema["$schema"] = "https://json-schema.org/draft/2020-12/schema"
            _supporting_schemas.append(new_schema)
            new_id = new_schema['$id']
            _schema_dict[new_id] = new_schema
@@ -65,15 +70,19 @@ def build_schema(core_schema_path: str | Path, supporting_schema_paths: Sequence
                logging.warning(f"Could not parse file {path} ({ex!r})")
            build_results.append(BuildResult(path, False, [ex]))
    logging.info(f"Loaded schema IDs: {[k for k in _schema_dict.keys()]}")
    _resolver = RefResolver(None, 
                    referrer=None, 
                    store=_schema_dict)
    logging.info("Created RefResolver")
    _validator = Draft202012Validator(_core_schema, resolver=_resolver)
    # _resolver = RefResolver(None, 
    #                 referrer=None, 
    #                 store=_schema_dict)
    # logging.info("Created RefResolver")
    # _registry = Registry().with_resources([(k, v) for k, v in _schema_dict.items()])
    # _registry.crawl()
    # print (_registry)
    _registry = Registry().with_contents([(k, v) for k, v in _schema_dict.items()])
    _validator = Draft202012Validator(_core_schema, resolver=None, format_checker=None, registry=_registry)
    logging.info("Created validator")
    ref_errors = []
    for schema in _supporting_schemas + [_validator.schema]:
        ref_errors += check_refs(schema, _resolver)
    # for schema in _supporting_schemas + [_validator.schema]:
    #     ref_errors += check_refs(schema, _resolver)
    if len(ref_errors) > 0:
        build_results.append(BuildResult(core_schema_path, False, ref_errors))
        return None, build_results
@@ -102,6 +111,7 @@ def validate_path(schema: Validator, path: Path | str) -> InstanceValidationResu
    try:
        path = Path(path)
        string = path.open().read()
        print (f"Validating {path}")
        return validate_string(schema, string, path)
    except FileNotFoundError as ex:
        return InstanceValidationResult(path, False, [ex])