Skip to content
checks.py 37 KiB
Newer Older
from http import HTTPStatus


class Checks:
    def __init__(self):
        self.checks = {
lopezaguilar's avatar
lopezaguilar committed
            'Check Response Status Code':
                Checks.check_response_status_code,
            'Check Response Body Containing Array Of URIs set to':
                Checks.check_response_body_containing_array_of_uris_set_to,
            'Check Created Resources Set To':
                Checks.check_created_resources_set_to,
            'Check Response Headers Containing Content-Type set to':
                Checks.check_response_headers_containing_content_type_set_to,
            'Check Response Headers Link Not Empty':
                Checks.check_response_headers_link_not_empty,
            'Check Response Headers Containing URI set to':
                Checks.check_response_headers_containing_uri_set_to,
            'Check Response Headers ID Not Empty':
                Checks.check_response_headers_id_not_empty,
            'Check Response Body Containing an Attribute set to':
                Checks.check_response_body_containing_an_attribute_set_to,
            'Check Response Body Containing Entity element':
                Checks.check_response_body_containing_entity_element,
            'Check Response Body Containing List Containing Entity Elements':
                Checks.check_response_body_containing_list_containing_entity_elements,
            'Check Response Body Containing List Containing Entity Elements With Different Types':
                Checks.check_response_body_containing_list_containing_entity_elements_with_different_types,
            'Check Response Body Containing EntityTemporal element':
                Checks.check_response_body_containing_entitytemporal_element,
            'Check Response Body Containing List Containing EntityTemporal elements':
                Checks.check_response_body_containing_list_containing_entitytemporal_elements,
            'Check Response Body Containing Subscription element':
                Checks.check_response_body_containing_subscription_element,
            'Check Response Body Containing List Containing Subscription elements':
lopezaguilar's avatar
lopezaguilar committed
                Checks.check_response_body_containing_list_containing_subscription_elements,
            'Check Response Body Containing Number Of Entities':
                Checks.check_response_body_containing_number_of_entities,
            'Check Response Body Containing Context Source Registration element':
                Checks.check_response_body_containing_context_source_registration_element,
            'Check Response Body Containing EntityTypeList element':
                Checks.check_response_body_containing_entitytypelist_element,
            'Check Response Body Containing EntityType element':
                Checks.check_response_body_containing_entitytype_element,
            'Check Response Body Containing EntityTypeInfo element':
                Checks.check_response_body_containing_entitytypeinfo_element,
            'Check Response Body Containing AttributeList element':
lopezaguilar's avatar
lopezaguilar committed
                Checks.check_response_body_containing_attributelist_element,
            'Check Response Body Containing Attribute element':
                Checks.check_response_body_containing_attribute_element,
            'Check Response Body Containing List Containing Context Source Registrations elements':
                Checks.check_response_body_containing_list_containing_context_source_registrations_elements,
            'Check Response Body Type When Using Session Request':
                Checks.check_response_body_type_when_using_session_request,
            'Check Response Body Containing ProblemDetails Element Containing Type Element set to':
                Checks.check_response_body_containing_problemdetails_element_containing_type_element_set_to,
            'Check Response Body Title When Using Session Request':
                Checks.check_response_body_title_when_using_session_request,
            'Check Response Body Containing ProblemDetails Element Containing Title Element':
                Checks.check_response_body_containing_problemdetails_element_containing_title_element,
            'Check RL Response Body Containing ProblemDetails Element Containing Type Element set to':
lopezaguilar's avatar
lopezaguilar committed
                Checks.check_rl_response_body_containing_problemdetails_element_containing_type_element_set_to,
            'Check RL Response Body Containing ProblemDetails Element Containing Title Element':
                Checks.check_rl_response_body_containing_problemdetails_element_containing_title_element,
            'Check JSON Value In Response Body':
                Checks.check_json_value_in_response_body,
            'Check Pagination Prev And Next Headers':
                Checks.check_pagination_prev_and_next_headers,
            'Check Created Resource Set To':
                Checks.check_created_resource_set_to,
            'Check Updated Resource Set To':
                Checks.check_updated_resource_set_to,
            'Check Updated Resources Set To':
                Checks.check_updated_resources_set_to,
lopezaguilar's avatar
lopezaguilar committed
            'Check SUT Not Containing Resource':
                Checks.check_sut_not_containing_resource,
            'Check SUT Not Containing Resources':
                Checks.check_sut_not_containing_resources,
            'Check NotificationParams':
lopezaguilar's avatar
lopezaguilar committed
                Checks.check_notificationparams,
            'Check Response Body Containing Batch Operation Result':
lopezaguilar's avatar
lopezaguilar committed
                Checks.check_response_body_containing_batch_operation_result,
lopezaguilar's avatar
lopezaguilar committed
            'Wait for notification':
                Checks.wait_for_notification,
            'Wait for no notification':
                Checks.wait_for_no_notification,
            'Wait for notification and validate it':
                Checks.wait_for_notification_and_validate_it,
lopezaguilar's avatar
lopezaguilar committed
            'Should be Equal':
lopezaguilar's avatar
lopezaguilar committed
                Checks.should_be_equal,
            'Dictionary Should Contain Key':
                Checks.dictionary_should_contain_key,
            'Should Not Be Empty':
                Checks.should_not_be_empty,
            'Should be True':
                Checks.should_be_true,
lopezaguilar's avatar
lopezaguilar committed
            'Check Response Body Content':
                Checks.check_response_body_content
lopezaguilar's avatar
lopezaguilar committed
        }

        self.args = {
            'Check Response Status Code': {
                'params': ['status_code'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0]
            },
            'Check Response Body Containing ProblemDetails Element Containing Type Element set to': {
                'params': ['type'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [1]
            },
            'Check Response Headers Containing Content-Type set to': {
                'params': ['content_type'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0]
            },
            'Check Response Body Containing an Attribute set to': {
lopezaguilar's avatar
lopezaguilar committed
                'params': ['expected_attribute_name', 'response_body', 'expected_attribute_value'],
                'position': []
            },
            'Check Response Body Containing List Containing EntityTemporal elements': {
                'params': ['filename', 'entity_ids'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1]
            'Check Response Body Containing List Containing Subscription elements': {
lopezaguilar's avatar
lopezaguilar committed
                'params': ['file', 'id', 'response'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1, 2]
            },
            'Check Response Body Containing Number Of Entities': {
lopezaguilar's avatar
lopezaguilar committed
                'params': ['entity_type', 'number_entities', 'response'],
                'position': [0, 1, 2]
            },
            'Check Response Body Containing Context Source Registration element': {
lopezaguilar's avatar
lopezaguilar committed
                'params': ['file', 'id', 'response'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1, 2]
            },
            'Check Response Body Containing EntityTypeList element': {
                'params': ['filename', 'response'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1]
            },
            'Check Response Body Containing EntityType element': {
                'params': ['filename', 'response'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1]
            },
            'Check Response Body Type When Using Session Request': {
                'params': ['type'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [1]
            },
            'Check RL Response Body Containing ProblemDetails Element Containing Type Element set to': {
                'params': ['type'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [1]
            },
            'Check JSON Value In Response Body': {
                'params': ['key', 'value'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1]
            },
            'Check Pagination Prev And Next Headers': {
                'params': ['previous', 'next'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [1, 2]
            },
            'Check Updated Resources Set To': {
                'params': ['number_entities'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0]
            },
            'Check SUT Not Containing Resource': {
                'params': ['status_code'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0]
            },
            'Check NotificationParams': {
lopezaguilar's avatar
lopezaguilar committed
                'params': ['filename', 'expected_additional_members', 'response_body'],
lopezaguilar's avatar
lopezaguilar committed
            },
            'Check Response Body Containing Batch Operation Result': {
                'params': ['operation'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0]
lopezaguilar's avatar
lopezaguilar committed
            },
            'Should be Equal': {
                'params': ['expected_value', 'obtained_value'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1]
lopezaguilar's avatar
lopezaguilar committed
            },
            'Check Response Body Containing Subscription element': {
                'params': ['filename', 'subscription_id', 'response_body'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1, 2]
lopezaguilar's avatar
lopezaguilar committed
            },
            'Wait for notification': {
                'params': ['timeout'],
                'position': []
            'Wait for notification and validate it': {
                'params': ['expected_subscription_id', 'expected_context_source_registration_ids',
                           'expected_trigger_reason', 'expected_notification_data_entities',
                           'timeout'],
                'position': []
            },
            'Wait for no notification': {
                'params': ['timeout'],
                'position': []
            },
            'Check Response Body Containing AttributeList element': {
                'params': ['filename', 'response'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1]
            },
            'Check Response Body Containing Entity element': {
                'params': ['filename', 'id', 'response'],
lopezaguilar's avatar
lopezaguilar committed
                'position': [0, 1, 2]
lopezaguilar's avatar
lopezaguilar committed
            'Check Response Body Content': {
                'params': ['expectation_filename', 'response_body', 'additional_ignored_path'],
                'position': []
            },
            'Dictionary Should Contain Key': {
                'params': ['dictionary', 'key'],
                'position': [0, 1]
            },
            'Should Not Be Empty': {
                'params': ['variable'],
                'position': [0]
            },
            'Should be True': {
                'params': ['expression'],
                'position': [0]
lopezaguilar's avatar
lopezaguilar committed
            }
        }

    @staticmethod
    def check_response_status_code(kwargs: list) -> str:
        if "status_code" in kwargs:
            status_code = kwargs['status_code']
            try:
                return f'Response Status Code set to {status_code} ({HTTPStatus(status_code).phrase})'
            except ValueError:
                return f'Response Status Code set to {status_code}'
        else:
            raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')

lopezaguilar's avatar
lopezaguilar committed
    def wait_for_notification(kwargs: list) -> str:
        if 'timeout' in kwargs and kwargs['timeout'] != '':
            result = f"After waiting '{kwargs['timeout']}' seconds"
        else:
            result = f"After waiting '5' seconds"

        return result

    @staticmethod
    def wait_for_notification_and_validate_it(kwargs: list) -> str:
        expected_parameters = ['expected_subscription_id', 'expected_context_source_registration_ids',
                               'expected_trigger_reason', 'expected_notification_data_entities',
                               'timeout']

        if 'expected_notification_data_entities' not in kwargs:
            kwargs['expected_notification_data_entities'] = "${EMPTY}"

        if 'timeout' not in kwargs:
            kwargs['timeout'] = '5'

        result = [x for x in kwargs if x not in expected_parameters]
        response = "Waiting for Notification and validate it"
        for key, value in kwargs.items():
            match key:
                case 'expected_subscription_id':
                    response = f"{response} and\n    Query Parameter: expected_subscription_id set to '{value}'"
                case 'expected_context_source_registration_ids':
                    response = f"{response} and\n    Query Parameter: expected_context_source_registration_ids set to '{value}'"
                case 'expected_trigger_reason':
                    response = f"{response} and\n    Query Parameter: expected_trigger_reason set to '{value}'"
                case 'expected_notification_data_entities':
                    response = f"{response} and\n    Query Parameter: expected_notification_data_entities set to '{value}'"
                case 'timeout':
                    response = f"{response} and\n    Query Parameter: timeout set to '{value}'"
                # If an exact match is not confirmed, this last case will be used if provided
                case 'checks':
                    pass
                case _:
                    raise Exception(f"ERROR, unexpected attribute '{result}', the attributes expected are "
                                    f"'{expected_parameters}', but received: {kwargs}")

        return response

    @staticmethod
    def wait_for_no_notification(kwargs: list) -> str:
        expected_parameters = ['timeout']

        if 'timeout' not in kwargs:
            kwargs['timeout'] = '5'

        result = [x for x in kwargs if x not in expected_parameters]
        response = "Waiting for no Notification data"
        for key, value in kwargs.items():
            match key:
                case 'timeout':
                    response = f"{response} and\n    Query Parameter: timeout set to '{value}'"
                # If an exact match is not confirmed, this last case will be used if provided
                case 'checks':
                    pass
                case _:
                    raise Exception(f"ERROR, unexpected attribute '{result}', the attributes expected are "
                                    f"'{expected_parameters}', but received: {kwargs}")

        return response

    @staticmethod
    def check_response_body_containing_array_of_uris_set_to(kwargs: list) -> str:
        return 'Response Body set to an array of created entities ids'

    @staticmethod
    def check_created_resources_set_to(kwargs: list) -> str:
        return 'Created resources set to ${entities}'

    @staticmethod
    def dictionary_should_contain_key(kwargs: list) -> str:
        if 'dictionary' in kwargs and 'key' in kwargs:
            return f"The dictionary `{kwargs['dictionary']}' should contain the key '{kwargs['key']}'"
        else:
            raise Exception(f'ERROR, Expected dictionary and key parameters but received: {kwargs}')

    @staticmethod
    def should_not_be_empty(kwargs: list) -> str:
        if 'variable' in kwargs:
            return f"The variable `{kwargs['variable']}' should not be '${{EMPTY}}'"
        else:
            raise Exception(f'ERROR, Expected dictionary and key parameters but received: {kwargs}')

    @staticmethod
    def should_be_true(kwargs: list) -> str:
        if 'expression' in kwargs:
            return f"The expression `{kwargs['expression']}' should be True"
        else:
            raise Exception(f'ERROR, Expected dictionary and key parameters but received: {kwargs}')

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_response_headers_containing_content_type_set_to(kwargs: list) -> str:
        if "content_type" in kwargs:
            content_type = kwargs['content_type']
            return f'Response Header: Content-Type set to {content_type}'
        else:
            raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')

    @staticmethod
    def check_response_headers_link_not_empty(kwargs: list) -> str:
        return f'Response Header: Link is not Empty'

    @staticmethod
    def check_response_headers_containing_uri_set_to(kwargs: list) -> str:
        return 'Response Header: Location containing ${registration_id}'

    @staticmethod
    def check_response_headers_id_not_empty(kwargs: list) -> str:
        return 'Response Header: Location is not Empty'

    @staticmethod
    def check_response_body_containing_an_attribute_set_to(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        expected_parameters = ['checks', 'expected_attribute_name', 'response_body', 'expected_attribute_value']

        result = [x for x in kwargs if x not in expected_parameters]
        response = "Check Response Body containing an Attribute set to"
        for key, value in kwargs.items():
            match key:
                case 'expected_attribute_name':
                    response = f"{response} and\n    Query Parameter: expected_attribute_name set to '{value}'"
                case 'response_body':
                    response = f"{response} and\n    Query Parameter: response_body set to '{value}'"
                case 'expected_attribute_value':
                    response = f"{response} and\n    Query Parameter: expected_attribute_value set to '{value}'"
                # If an exact match is not confirmed, this last case will be used if provided
                case 'checks':
                    pass
                case _:
                    raise Exception(f"ERROR, unexpected attribute '{result}', the attributes expected are "
                                    f"'{expected_parameters}', but received: {kwargs}")

        return response
lopezaguilar's avatar
lopezaguilar committed

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_response_body_containing_entity_element(kwargs: list) -> str:
        if 'filename' in kwargs and 'id' in kwargs and 'response' in kwargs:
            return f"Response Body containing en entity element with id set to '{kwargs['id']}' and body content set to '{kwargs['filename']}'"
lopezaguilar's avatar
lopezaguilar committed

    @staticmethod
    def check_response_body_containing_list_containing_entity_elements(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        return 'Response Body containing a list containing Entity Elements, containing ${value} provided'
lopezaguilar's avatar
lopezaguilar committed

    @staticmethod
    def check_response_body_containing_list_containing_entity_elements_with_different_types(kwargs: list) -> str:
        return ('Response Body containing a list containing Entity elements, '
                'containing a list of entity types to be retrieved')

    @staticmethod
    def check_response_body_containing_entitytemporal_element(kwargs: list) -> str:
        return ('Response Body containing EntityTemporal element containing attribute instances '
                'in the time range specified by the NGSI-LD temporal query')

    @staticmethod
    def check_response_body_containing_list_containing_entitytemporal_elements(kwargs: list) -> str:
        if 'filename' in kwargs and 'entity_ids' in kwargs:
            return (f"Request response body containing a list that contains Entity Temporal Elements\n"
                    f"    compared with file '{kwargs['filename']}'\n"
                    f"    and using the list of entity ids define in '{kwargs['entity_ids']}'")
lopezaguilar's avatar
lopezaguilar committed
        else:
            raise Exception(f"ERROR, expected parameters 'filename' and 'entity_ids', but received '{kwargs}'")
lopezaguilar's avatar
lopezaguilar committed

    @staticmethod
    def check_response_body_containing_subscription_element(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        if 'filename' in kwargs and 'subscription_id' in kwargs and 'response_body' in kwargs:
            return (f"Response Body containing the same content defined in file '{kwargs['filename']}'"
                    f" with subscription id '{kwargs['subscription_id']}'")
        else:
            raise Exception(f"ERROR, expected parameters 'filename' and 'entity_ids', but received '{kwargs}'")
lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
lopezaguilar's avatar
lopezaguilar committed
    def check_response_body_containing_list_containing_subscription_elements(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        if 'file' in kwargs and 'id' in kwargs and 'response' in kwargs:
            return (f"Response containing:\n"
                    f"    * file set to '{kwargs['file']}'\n"
                    f"    * id set to '{kwargs['id']}'\n"
                    f"    * response set to '{kwargs['response']}'")
lopezaguilar's avatar
lopezaguilar committed
        else:
lopezaguilar's avatar
lopezaguilar committed
            raise Exception(f"ERROR, expected 'file', 'id', and 'response' attributes, received: '{kwargs}'")
lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_response_body_containing_number_of_entities(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        if "entity_type" in kwargs and 'number_entities' in kwargs and 'response' in kwargs:
            number_entities = kwargs['number_entities']
lopezaguilar's avatar
lopezaguilar committed
            entity_type = kwargs['entity_type']
lopezaguilar's avatar
lopezaguilar committed
            response = kwargs['response']
            return (f"Response Body containing a list of entities equal to '{number_entities}' of type '{entity_type}' "
                    f"with response set to '{response}'")
lopezaguilar's avatar
lopezaguilar committed
        else:
            raise Exception(f'ERROR, expected entity_type and number_entities attributes, but received: {kwargs}')

    @staticmethod
    def check_response_body_containing_context_source_registration_element(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        if 'file' in kwargs and 'id' in kwargs and 'response' in kwargs:
            return (f"Response containing:\n"
                    f"    * file set to '{kwargs['file']}'\n"
                    f"    * id set to '{kwargs['id']}'\n"
                    f"    * response set to '{kwargs['response']}'")
lopezaguilar's avatar
lopezaguilar committed
        else:
lopezaguilar's avatar
lopezaguilar committed
            raise Exception(f"ERROR, expected 'file', 'id', and 'response' attributes, received: '{kwargs}'")
        # if 'csr_description' in kwargs:
        #     csr = kwargs['csr_description']
        #     return f"Response body containing a '{csr}'"
        # else:
        #     raise Exception(f"ERROR, expected csr_description attribute, but received: {kwargs}")
lopezaguilar's avatar
lopezaguilar committed

    @staticmethod
    def check_response_body_containing_entitytypelist_element(kwargs: list) -> str:
        if 'filename' in kwargs and 'response' in kwargs:
            return f"Response Body containing an Entity Type List with expectation body equal to file: '{kwargs['filename']}'"
lopezaguilar's avatar
lopezaguilar committed
        else:
            raise Exception(f"ERROR, expected filename and response attributes, but received: {kwargs}")
lopezaguilar's avatar
lopezaguilar committed

    @staticmethod
    def check_response_body_containing_entitytype_element(kwargs: list) -> str:
        if 'filename' in kwargs and 'response' in kwargs:
            description = kwargs['filename']
            return f"Response Body containing an Entity Type Element with expectation body equal to file: '{kwargs['filename']}'"
lopezaguilar's avatar
lopezaguilar committed
        else:
            raise Exception(f"ERROR, expected filename and response attributes, but received: {kwargs}")
lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
lopezaguilar's avatar
lopezaguilar committed
    def check_response_body_containing_entitytypeinfo_element(kwargs: list) -> str:
        return 'Response Body containing an Entity Type Info'

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
lopezaguilar's avatar
lopezaguilar committed
    def check_response_body_containing_attributelist_element(kwargs: list) -> str:
        if 'filename' in kwargs and 'response' in kwargs:
            return (f"Response Body containing an Attribute List element"
                    f"\n    * with filename set to '{kwargs['filename']}'"
                    f"\n    * response set to '{kwargs['response']}'")
lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_response_body_containing_attribute_element(kwargs: list) -> str:
        return 'Response Body containing an array of Attributes'

    @staticmethod
    def check_response_body_containing_list_containing_context_source_registrations_elements(kwargs: list) -> str:
        return 'Response body set to list of all matching Context Source Registrations resolved against the default JSON-LD context'

    @staticmethod
    def check_response_body_type_when_using_session_request(kwargs: list) -> str:
        if 'type' in kwargs:
            type = kwargs['type']
            return f"Response Body containing the type '{type}'"
        else:
            raise Exception(f"ERROR, expected type attribute, but received: {kwargs}")

    @staticmethod
    def check_response_body_containing_problemdetails_element_containing_type_element_set_to(kwargs: list) -> str:
        if 'type' in kwargs:
            type = kwargs['type']
            return f"Response Body containing the type '{type}'"
        else:
            raise Exception(f"ERROR, expected type attribute, but received: {kwargs}")

    @staticmethod
    def check_response_body_title_when_using_session_request(kwargs: list) -> str:
        return "Response body containing 'title' element"

    @staticmethod
    def check_response_body_containing_problemdetails_element_containing_title_element(kwargs: list) -> str:
        return "Response body containing 'title' element"

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
lopezaguilar's avatar
lopezaguilar committed
    def check_rl_response_body_containing_problemdetails_element_containing_type_element_set_to(kwargs: list) -> str:
        if 'type' in kwargs:
            type = kwargs['type']
            return f"Response Body containing the type '{type}'"
        else:
            raise Exception(f"ERROR, expected type attribute, but received: {kwargs}")

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_rl_response_body_containing_problemdetails_element_containing_title_element(kwargs: list) -> str:
        return "Response body containing 'title' element"

    @staticmethod
    def check_json_value_in_response_body(kwargs: list) -> str:
        if 'key' in kwargs and 'value' in kwargs:
            key = kwargs['key']
            value = kwargs['value']
            return f"Response Body containing the key '{key}', with the value '{value}'"
        else:
            raise Exception(f"ERROR, expected key and value attributes, but received: {kwargs}")

    @staticmethod
    def check_pagination_prev_and_next_headers(kwargs: list) -> str:
        previous = None
        next = None
        if 'previous' in kwargs:
            previous = kwargs['previous']
            previous_text = f"with 'Prev' header equal to '{previous}'"

        if 'next' in kwargs:
            next = kwargs['next']
            next_text = f"with 'Next' header equal to '{next}'"

        if previous is None and next is None:
            raise Exception(f"ERROR, expected previous or next attributes, but received: {kwargs}")
        elif previous is not None and next is None:
            result = f"Response header {previous_text}"
        elif previous is None and next is not None:
            result = f"Response header {next_text}"
        else:
lopezaguilar's avatar
lopezaguilar committed
            result = f"Response header {previous_text} and {next_text}"
lopezaguilar's avatar
lopezaguilar committed

        return result

    @staticmethod
    def check_created_resource_set_to(kwargs: list) -> str:
            return "Created Entity set to ${entity}"

    @staticmethod
lopezaguilar's avatar
lopezaguilar committed
    def check_updated_resource_set_to(kwargs: list) -> str:
            return "Updated Entity set to ${entity}"

    @staticmethod
    def check_updated_resources_set_to(kwargs: list) -> str:
        if 'number_entities' in kwargs:
            number_entities = kwargs['number_entities']
            return f"Updated Entities set to '{number_entities}' valid entities"

    @staticmethod
lopezaguilar's avatar
lopezaguilar committed
    def check_sut_not_containing_resource(kwargs: list) -> str:
        if "status_code" in kwargs:
            status_code = kwargs['status_code']
            try:
                return f'Response Status Code set to {status_code} ({HTTPStatus(status_code).phrase})'
            except ValueError:
                return f'Response Status Code set to {status_code}'
lopezaguilar's avatar
lopezaguilar committed
        else:
            raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')

    @staticmethod
    def check_sut_not_containing_resources(kwargs: list) -> str:
        return f'Response body is empty'

    @staticmethod
    def check_notificationparams(kwargs: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        if 'filename' in kwargs and 'expected_additional_members' in kwargs and 'response_body' in kwargs:
            return (f"Response containing:\n"
                    f"    * Notification expectation file path set to '{kwargs['filename']}'\n"
                    f"    * Expected Additional Members set to '{kwargs['expected_additional_members']}'\n"
                    f"    * Response Body set to '{kwargs['response_body']}'\n")
lopezaguilar's avatar
lopezaguilar committed
            raise Exception(f"ERROR, expected 'filename', 'expected_additional_members', and 'response_body', "
                            f"but received '{kwargs}'")

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_response_body_containing_batch_operation_result(kwargs: list) -> str:
        if "operation" in kwargs:
            return f"Response Status Code set to '{kwargs['operation']}'"
        else:
            raise Exception(f'ERROR, Expected operation parameter but received: {kwargs}')

lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def check_response_body_content(kwargs: list) -> str:
        expected_parameters = ['expectation_filename', 'response_body', 'additional_ignored_path']

        if 'additional_ignored_path' not in kwargs:
            kwargs['additional_ignored_path'] = '${EMPTY}'

        result = [x for x in kwargs if x not in expected_parameters]
        response = "Check Response Body Content"
        for key, value in kwargs.items():
            match key:
                case 'expectation_filename':
                    response = f"{response} and\n    Query Parameter: expectation_filename set to '{value}'"
                case 'response_body':
                    response = f"{response} and\n    Query Parameter: response_body set to '{value}'"
                case 'additional_ignored_path':
                    response = f"{response} and\n    Query Parameter: additional_ignored_path set to '{value}'"
                # If an exact match is not confirmed, this last case will be used if provided
                case 'checks':
                    pass
                case _:
                    raise Exception(f"ERROR, unexpected attribute '{result}', the attributes expected are "
                                    f"'{expected_parameters}', but received: {kwargs}")

        return response


lopezaguilar's avatar
lopezaguilar committed
    @staticmethod
    def should_be_equal(kwargs: list) -> str:
        if 'expected_value' in kwargs and 'obtained_value' in kwargs:
            return f"Notification data: '{kwargs['obtained_value']}' equal to '{kwargs['expected_value']}'"
        else:
            raise Exception(f"ERROR, Expected 'expected_value' and 'obtained_value' parameters but received: '{kwargs}'")

    def get_checks(self, **kwargs) -> str:
        checking = None

        if "checks" in kwargs:
            checking = kwargs["checks"]
        else:
            raise Exception(f'ERROR,  the attribute checks is mandatory, but received: {kwargs}')

        if isinstance(checking, str):
            result = self.checks[checking](kwargs)
        elif isinstance(checking, list):
            result = [self.checks[x](kwargs) for x in checking]
            result = " and\n".join(result)
        else:
            raise Exception(f"ERROR, checks type not supported: {checking}")

        return result

if __name__ == "__main__":
    data = Checks()

lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check Response Status Code',
                          status_code=201))
    print(data.get_checks(checks='Check Response Body Containing Array Of URIs set to'))
    print(data.get_checks(checks='Check Created Resources Set To'))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check Response Headers Containing Content-Type set to',
                          content_type='application/json'))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check Response Headers Link Not Empty'))
    print(data.get_checks(checks='Check Response Headers Containing URI set to'))
    print(data.get_checks(checks='Check Response Headers ID Not Empty'))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check Response Body Containing an Attribute set to',
                          attribute_name='status'))
    print(data.get_checks(checks='Check Response Body Containing an Attribute set to',
                          attribute_name='status',
                          attribute_value='active'))
    print(data.get_checks(checks='Check Response Body Containing Entity element'))
    print(data.get_checks(checks='Check Response Body Containing List Containing Entity Elements'))
    print(data.get_checks(checks='Check Response Body Containing List Containing Entity Elements With Different Types'))
    print(data.get_checks(checks='Check Response Body Containing EntityTemporal element'))
    print(data.get_checks(checks='Check Response Body Containing List Containing EntityTemporal elements',
                          timeRel='after',
                          timeAt='2020-07-01T12:05:00Z'))
    print(data.get_checks(checks='Check Response Body Containing Subscription element'))
    print(data.get_checks(checks='Check Response Body Containing List Containing Subscription elements',
                          number=2))
    print(data.get_checks(checks='Check Response Body Containing List Containing Subscription elements',
                          number=1))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check Response Body Containing Number Of Entities',
                          entity_type="Vehicle",
                          number_entities=3))
    print(data.get_checks(checks='Check Response Body Containing Context Source Registration element',
                          csr_description='Context Source Registration'))
    print(data.get_checks(checks='Check Response Body Containing EntityTypeList element',
                          description='Json object with list of entity types with context'))
    print(data.get_checks(checks='Check Response Body Containing EntityType element',
                          description='Json object with an entity type with context'))
    print(data.get_checks(checks='Check Response Body Containing EntityTypeInfo element'))
    print(data.get_checks(checks='Check Response Body Containing AttributeList element'))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check Response Body Containing Attribute element'))
    print(data.get_checks(checks='Check Response Body Containing List Containing Context Source Registrations elements'))
    print(data.get_checks(checks='Check Response Body Type When Using Session Request',
                          type='https://uri.etsi.org/ngsi-ld/errors/BadRequestData'))
    print(data.get_checks(checks='Check Response Body Containing ProblemDetails Element Containing Type Element set to',
                          type='https://uri.etsi.org/ngsi-ld/errors/BadRequestData'))
    print(data.get_checks(checks='Check Response Body Title When Using Session Request'))
    print(data.get_checks(checks='Check Response Body Containing ProblemDetails Element Containing Title Element'))
    print(data.get_checks(checks='Check RL Response Body Containing ProblemDetails Element Containing Type Element set to',
                          type='https://uri.etsi.org/ngsi-ld/errors/BadRequestData'))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check RL Response Body Containing ProblemDetails Element Containing Title Element'))
    print(data.get_checks(checks='Check JSON Value In Response Body',
                          key="['information']['entities'][0]['type']",
                          value="Building"))
    print(data.get_checks(checks='Check Pagination Prev And Next Headers',
                          previous='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=1>;rel="prev";type="application/ld+json"',
                          next='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=3>;rel="next";type="application/ld+json"'))
    print(data.get_checks(checks='Check Pagination Prev And Next Headers',
                          previous='',
                          next='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=3>;rel="next";type="application/ld+json"'))
    print(data.get_checks(checks='Check Pagination Prev And Next Headers',
                          previous='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=1>;rel="prev";type="application/ld+json"',
                          next=''))
    print(data.get_checks(checks='Check Pagination Prev And Next Headers',
                          previous='',
                          next=''))
    print(data.get_checks(checks='Check Created Resource Set To'))
    print(data.get_checks(checks='Check Updated Resource Set To'))
    print(data.get_checks(checks='Check Updated Resources Set To',
                          number_entities=2))
lopezaguilar's avatar
lopezaguilar committed
    print(data.get_checks(checks='Check SUT Not Containing Resource',
                          status_code=404))
    print(data.get_checks(checks='Check SUT Not Containing Resources'))
    print(data.get_checks(checks='Check NotificationParams',
                          format="keyValues",
                          uri="http://my.endpoint.org/notify",
                          accept="application/json",
                          status="ok",
                          timesSent="1"))

    print()
    print(data.get_checks(checks=
                          ['Check Response Status Code',
                           'Check Response Body Containing Array Of URIs set to',
                           'Check Created Resources Set To']
                          , status_code=201))
lopezaguilar's avatar
lopezaguilar committed
    print()
lopezaguilar's avatar
lopezaguilar committed
    # Check exceptions
    try:
        print(data.get_checks(checks='Check Response Body Containing an Attribute set to'))
    except Exception as e:
        print(e)