Newer
Older
from http import HTTPStatus
class Checks:
def __init__(self):
self.checks = {
'Check Response Status Code':
Checks.check_response_status_code,
'Check Response Body Containing Array Of URIs set to':
Checks.check_response_body_containing_array_of_uris_set_to,
'Check Created Resources Set To':
Checks.check_created_resources_set_to,
'Check Response Headers Containing Content-Type set to':
Checks.check_response_headers_containing_content_type_set_to,
'Check Response Headers Link Not Empty':
Checks.check_response_headers_link_not_empty,
'Check Response Headers Containing URI set to':
Checks.check_response_headers_containing_uri_set_to,
'Check Response Headers ID Not Empty':
Checks.check_response_headers_id_not_empty,
'Check Response Body Containing an Attribute set to':
Checks.check_response_body_containing_an_attribute_set_to,
'Check Response Body Containing Entity element':
Checks.check_response_body_containing_entity_element,
'Check Response Body Containing List Containing Entity Elements':
Checks.check_response_body_containing_list_containing_entity_elements,
'Check Response Body Containing List Containing Entity Elements With Different Types':
Checks.check_response_body_containing_list_containing_entity_elements_with_different_types,
'Check Response Body Containing EntityTemporal element':
Checks.check_response_body_containing_entitytemporal_element,
'Check Response Body Containing List Containing EntityTemporal elements':
Checks.check_response_body_containing_list_containing_entitytemporal_elements,
'Check Response Body Containing Subscription element':
Checks.check_response_body_containing_subscription_element,
'Check Response Body Containing List Containing Subscription elements':
Checks.check_response_body_containing_list_containing_subscription_elements,
'Check Response Body Containing Number Of Entities':
Checks.check_response_body_containing_number_of_entities,
'Check Response Body Containing Context Source Registration element':
Checks.check_response_body_containing_context_source_registration_element,
'Check Response Body Containing EntityTypeList element':
Checks.check_response_body_containing_entitytypelist_element,
'Check Response Body Containing EntityType element':
Checks.check_response_body_containing_entitytype_element,
'Check Response Body Containing EntityTypeInfo element':
Checks.check_response_body_containing_entitytypeinfo_element,
'Check Response Body Containing AttributeList element':
Checks.check_response_body_containing_attributelist_element,
'Check Response Body Containing Attribute element':
Checks.check_response_body_containing_attribute_element,
'Check Response Body Containing List Containing Context Source Registrations elements':
Checks.check_response_body_containing_list_containing_context_source_registrations_elements,
'Check Response Body Type When Using Session Request':
Checks.check_response_body_type_when_using_session_request,
'Check Response Body Containing ProblemDetails Element Containing Type Element set to':
Checks.check_response_body_containing_problemdetails_element_containing_type_element_set_to,
'Check Response Body Title When Using Session Request':
Checks.check_response_body_title_when_using_session_request,
'Check Response Body Containing ProblemDetails Element Containing Title Element':
Checks.check_response_body_containing_problemdetails_element_containing_title_element,
'Check RL Response Body Containing ProblemDetails Element Containing Type Element set to':
Checks.check_rl_response_body_containing_problemdetails_element_containing_type_element_set_to,
'Check RL Response Body Containing ProblemDetails Element Containing Title Element':
Checks.check_rl_response_body_containing_problemdetails_element_containing_title_element,
'Check JSON Value In Response Body':
Checks.check_json_value_in_response_body,
'Check Pagination Prev And Next Headers':
Checks.check_pagination_prev_and_next_headers,
'Check Created Resource Set To':
Checks.check_created_resource_set_to,
'Check Updated Resource Set To':
Checks.check_updated_resource_set_to,
'Check Updated Resources Set To':
Checks.check_updated_resources_set_to,
Checks.check_sut_not_containing_resource,
'Check SUT Not Containing Resources':
Checks.check_sut_not_containing_resources,
'Check NotificationParams':
Checks.check_notificationparams,
'Check Response Body Containing Batch Operation Result':
Checks.check_response_body_containing_batch_operation_result,
'Check Response Status Code': {
'params': ['status_code'],
'position': [1]
},
'Check Response Body Containing ProblemDetails Element Containing Type Element set to': {
'params': ['type'],
'position': [2]
},
'Check Response Headers Containing Content-Type set to': {
'params': ['content_type'],
},
'Check Response Body Containing an Attribute set to': {
'params': ['attribute_name'],
},
'Check Response Body Containing List Containing EntityTemporal elements': {
'params': ['filename', 'entity_ids'],
'position': [1, 2]
},
'Check Response Body Containing List Containing Subscription elements': {
'params': ['file', 'id', 'response'],
'position': [1, 2, 3]
},
'Check Response Body Containing Number Of Entities': {
'params': ['entity_type', 'number_entities'],
'position': []
},
'Check Response Body Containing Context Source Registration element': {
'params': ['file', 'id', 'response'],
'position': [1, 2, 3]
},
'Check Response Body Containing EntityTypeList element': {
'params': ['filename', 'response'],
'position': [1, 2]
},
'Check Response Body Containing EntityType element': {
'params': ['filename', 'response'],
'position': [1, 2]
},
'Check Response Body Type When Using Session Request': {
'params': ['type'],
'position': [2]
},
'Check RL Response Body Containing ProblemDetails Element Containing Type Element set to': {
'params': ['type'],
'position': [2]
},
'Check JSON Value In Response Body': {
'params': ['key', 'value'],
'position': [1, 2]
},
'Check Pagination Prev And Next Headers': {
'params': ['previous', 'next'],
'position': [2, 3]
},
'Check Updated Resources Set To': {
'params': ['number_entities'],
'position': []
},
'Check SUT Not Containing Resource': {
'params': ['status_code'],
'position': [1]
},
'Check NotificationParams': {
'params': ['filename', 'expected_additional_members', 'response_body'],
},
'Check Response Body Containing Batch Operation Result': {
'params': ['operation'],
'position': [1]
},
'Should be Equal': {
'params': ['expected_value', 'obtained_value'],
'position': [1, 2]
},
'Check Response Body Containing Subscription element': {
'params': ['filename', 'subscription_id', 'response_body'],
'position': [1, 2, 3]
},
'Wait for notification': {
'params': ['timeout'],
'position': []
},
'Check Response Body Containing AttributeList element': {
'params': ['filename', 'response'],
'position': [1, 2]
},
'Check Response Body Containing Entity element': {
'params': ['filename', 'id', 'response'],
'position': [1, 2, 3]
},
}
@staticmethod
def check_response_status_code(kwargs: list) -> str:
if "status_code" in kwargs:
status_code = kwargs['status_code']
try:
return f'Response Status Code set to {status_code} ({HTTPStatus(status_code).phrase})'
except ValueError:
return f'Response Status Code set to {status_code}'
else:
raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')
def wait_for_notification(kwargs: list) -> str:
if 'timeout' in kwargs and kwargs['timeout'] != '':
result = f"After waiting '{kwargs['timeout']}' seconds"
else:
result = f"After waiting '5' seconds"
return result
@staticmethod
def check_response_body_containing_array_of_uris_set_to(kwargs: list) -> str:
return 'Response Body set to an array of created entities ids'
@staticmethod
def check_created_resources_set_to(kwargs: list) -> str:
return 'Created resources set to ${entities}'
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@staticmethod
def check_response_headers_containing_content_type_set_to(kwargs: list) -> str:
if "content_type" in kwargs:
content_type = kwargs['content_type']
return f'Response Header: Content-Type set to {content_type}'
else:
raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')
@staticmethod
def check_response_headers_link_not_empty(kwargs: list) -> str:
return f'Response Header: Link is not Empty'
@staticmethod
def check_response_headers_containing_uri_set_to(kwargs: list) -> str:
return 'Response Header: Location containing ${registration_id}'
@staticmethod
def check_response_headers_id_not_empty(kwargs: list) -> str:
return 'Response Header: Location is not Empty'
@staticmethod
def check_response_body_containing_an_attribute_set_to(kwargs: list) -> str:
if "attribute_name" in kwargs:
attribute_name = kwargs['attribute_name']
if "attribute_value" in kwargs:
attribute_value = kwargs['attribute_value']
result = (f"Response Body contains the attribute: '{attribute_name}', "
f"with the value: '{attribute_value}'")
else:
result = f"Response Body contains the attribute: '{attribute_name}'"
return result
else:
raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')
@staticmethod
def check_response_body_containing_entity_element(kwargs: list) -> str:
if 'filename' in kwargs and 'id' in kwargs and 'response' in kwargs:
return f"Response Body containing en entity element with id set to '{kwargs['id']}' and body content set to '{kwargs['filename']}'"
@staticmethod
def check_response_body_containing_list_containing_entity_elements(kwargs: list) -> str:
return 'Response Body containing a list containing Entity Elements, containing ${value} provided'
@staticmethod
def check_response_body_containing_list_containing_entity_elements_with_different_types(kwargs: list) -> str:
return ('Response Body containing a list containing Entity elements, '
'containing a list of entity types to be retrieved')
@staticmethod
def check_response_body_containing_entitytemporal_element(kwargs: list) -> str:
return ('Response Body containing EntityTemporal element containing attribute instances '
'in the time range specified by the NGSI-LD temporal query')
@staticmethod
def check_response_body_containing_list_containing_entitytemporal_elements(kwargs: list) -> str:
if 'filename' in kwargs and 'entity_ids' in kwargs:
return (f"Request response body containing a list that contains Entity Temporal Elements\n"
f" compared with file '{kwargs['filename']}'\n"
f" and using the list of entity ids define in '{kwargs['entity_ids']}'")
raise Exception(f"ERROR, expected parameters 'filename' and 'entity_ids', but received '{kwargs}'")
@staticmethod
def check_response_body_containing_subscription_element(kwargs: list) -> str:
if 'filename' in kwargs and 'subscription_id' in kwargs and 'response_body' in kwargs:
return (f"Response Body containing the same content defined in file '{kwargs['filename']}'"
f" with subscription id '{kwargs['subscription_id']}'")
else:
raise Exception(f"ERROR, expected parameters 'filename' and 'entity_ids', but received '{kwargs}'")
def check_response_body_containing_list_containing_subscription_elements(kwargs: list) -> str:
if 'file' in kwargs and 'id' in kwargs and 'response' in kwargs:
return (f"Response containing:\n"
f" * file set to '{kwargs['file']}'\n"
f" * id set to '{kwargs['id']}'\n"
f" * response set to '{kwargs['response']}'")
raise Exception(f"ERROR, expected 'file', 'id', and 'response' attributes, received: '{kwargs}'")
@staticmethod
def check_response_body_containing_number_of_entities(kwargs: list) -> str:
if "entity_type" in kwargs and 'number_entities' in kwargs:
mumber_entityes = kwargs['number_entities']
entity_type = kwargs['entity_type']
return f"Response Body containing a list of entities ({mumber_entityes}) of type '{entity_type}'"
else:
raise Exception(f'ERROR, expected entity_type and number_entities attributes, but received: {kwargs}')
@staticmethod
def check_response_body_containing_context_source_registration_element(kwargs: list) -> str:
if 'file' in kwargs and 'id' in kwargs and 'response' in kwargs:
return (f"Response containing:\n"
f" * file set to '{kwargs['file']}'\n"
f" * id set to '{kwargs['id']}'\n"
f" * response set to '{kwargs['response']}'")
raise Exception(f"ERROR, expected 'file', 'id', and 'response' attributes, received: '{kwargs}'")
# if 'csr_description' in kwargs:
# csr = kwargs['csr_description']
# return f"Response body containing a '{csr}'"
# else:
# raise Exception(f"ERROR, expected csr_description attribute, but received: {kwargs}")
@staticmethod
def check_response_body_containing_entitytypelist_element(kwargs: list) -> str:
if 'filename' in kwargs and 'response' in kwargs:
return f"Response Body containing an Entity Type List with expectation body equal to file: '{kwargs['filename']}'"
raise Exception(f"ERROR, expected filename and response attributes, but received: {kwargs}")
@staticmethod
def check_response_body_containing_entitytype_element(kwargs: list) -> str:
if 'filename' in kwargs and 'response' in kwargs:
description = kwargs['filename']
return f"Response Body containing an Entity Type Element with expectation body equal to file: '{kwargs['filename']}'"
raise Exception(f"ERROR, expected filename and response attributes, but received: {kwargs}")
def check_response_body_containing_entitytypeinfo_element(kwargs: list) -> str:
return 'Response Body containing an Entity Type Info'
def check_response_body_containing_attributelist_element(kwargs: list) -> str:
if 'filename' in kwargs and 'response' in kwargs:
return (f"Response Body containing an Attribute List element"
f"\n * with filename set to '{kwargs['filename']}'"
f"\n * response set to '{kwargs['response']}'")
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@staticmethod
def check_response_body_containing_attribute_element(kwargs: list) -> str:
return 'Response Body containing an array of Attributes'
@staticmethod
def check_response_body_containing_list_containing_context_source_registrations_elements(kwargs: list) -> str:
return 'Response body set to list of all matching Context Source Registrations resolved against the default JSON-LD context'
@staticmethod
def check_response_body_type_when_using_session_request(kwargs: list) -> str:
if 'type' in kwargs:
type = kwargs['type']
return f"Response Body containing the type '{type}'"
else:
raise Exception(f"ERROR, expected type attribute, but received: {kwargs}")
@staticmethod
def check_response_body_containing_problemdetails_element_containing_type_element_set_to(kwargs: list) -> str:
if 'type' in kwargs:
type = kwargs['type']
return f"Response Body containing the type '{type}'"
else:
raise Exception(f"ERROR, expected type attribute, but received: {kwargs}")
@staticmethod
def check_response_body_title_when_using_session_request(kwargs: list) -> str:
return "Response body containing 'title' element"
@staticmethod
def check_response_body_containing_problemdetails_element_containing_title_element(kwargs: list) -> str:
return "Response body containing 'title' element"
def check_rl_response_body_containing_problemdetails_element_containing_type_element_set_to(kwargs: list) -> str:
if 'type' in kwargs:
type = kwargs['type']
return f"Response Body containing the type '{type}'"
else:
raise Exception(f"ERROR, expected type attribute, but received: {kwargs}")
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
@staticmethod
def check_rl_response_body_containing_problemdetails_element_containing_title_element(kwargs: list) -> str:
return "Response body containing 'title' element"
@staticmethod
def check_json_value_in_response_body(kwargs: list) -> str:
if 'key' in kwargs and 'value' in kwargs:
key = kwargs['key']
value = kwargs['value']
return f"Response Body containing the key '{key}', with the value '{value}'"
else:
raise Exception(f"ERROR, expected key and value attributes, but received: {kwargs}")
@staticmethod
def check_pagination_prev_and_next_headers(kwargs: list) -> str:
previous = None
next = None
if 'previous' in kwargs:
previous = kwargs['previous']
previous_text = f"with 'Prev' header equal to '{previous}'"
if 'next' in kwargs:
next = kwargs['next']
next_text = f"with 'Next' header equal to '{next}'"
if previous is None and next is None:
raise Exception(f"ERROR, expected previous or next attributes, but received: {kwargs}")
elif previous is not None and next is None:
result = f"Response header {previous_text}"
elif previous is None and next is not None:
result = f"Response header {next_text}"
else:
result = f"Response heacer {previous_text} and {next_text}"
return result
@staticmethod
def check_created_resource_set_to(kwargs: list) -> str:
return "Created Entity set to ${entity}"
def check_updated_resource_set_to(kwargs: list) -> str:
return "Updated Entity set to ${entity}"
def check_updated_resources_set_to(kwargs: list) -> str:
if 'number_entities' in kwargs:
number_entities = kwargs['number_entities']
return f"Updated Entities set to '{number_entities}' valid entities"
def check_sut_not_containing_resource(kwargs: list) -> str:
if "status_code" in kwargs:
status_code = kwargs['status_code']
try:
return f'Response Status Code set to {status_code} ({HTTPStatus(status_code).phrase})'
except ValueError:
return f'Response Status Code set to {status_code}'
else:
raise Exception(f'ERROR, Expected status_code parameter but received: {kwargs}')
def check_sut_not_containing_resources(kwargs: list) -> str:
return f'Response body is empty'
def check_notificationparams(kwargs: list) -> str:
if 'filename' in kwargs and 'expected_additional_members' in kwargs and 'response_body' in kwargs:
return (f"Response containing:\n"
f" * Notification expectation file path set to '{kwargs['filename']}'\n"
f" * Expected Additional Members set to '{kwargs['expected_additional_members']}'\n"
f" * Response Body set to '{kwargs['response_body']}'\n")
raise Exception(f"ERROR, expected 'filename', 'expected_additional_members', and 'response_body', "
f"but received '{kwargs}'")
# expected_parameters = ["format", "uri", "accept", "status", "timesSent"]
# result = [x for x in expected_parameters if x not in kwargs]
#
# if len(result) == 0:
# response = ("Response containing:\n"
# f" * payload['format'] is equal to '{kwargs['format']}'\n"
# f" * payload['endpoint']['uri'] is equal to '{kwargs['uri']}'\n"
# f" * payload['endpoint']['accept'] is equal to '{kwargs['accept']}'\n"
# f" * payload['status'] is equal to '{kwargs['status']}'\n"
# f" * payload['timesSent'] is equal to '{kwargs['timesSent']}\n"
# f" * payload['notification']['lastNotification'] is not Empty\n"
# f" * payload['notification']['lastSuccess'] is not Empty\n")
# return response
# else:
# raise Exception(f"ERROR, unexpected attributes '{result}', the attributes expected are "
# f"'{expected_parameters}', but received: {kwargs}")
@staticmethod
def check_response_body_containing_batch_operation_result(kwargs: list) -> str:
if "operation" in kwargs:
return f"Response Status Code set to '{kwargs['operation']}'"
else:
raise Exception(f'ERROR, Expected operation parameter but received: {kwargs}')
@staticmethod
def should_be_equal(kwargs: list) -> str:
if 'expected_value' in kwargs and 'obtained_value' in kwargs:
return f"Notification data: '{kwargs['obtained_value']}' equal to '{kwargs['expected_value']}'"
else:
raise Exception(f"ERROR, Expected 'expected_value' and 'obtained_value' parameters but received: '{kwargs}'")
def get_checks(self, **kwargs) -> str:
checking = None
if "checks" in kwargs:
checking = kwargs["checks"]
else:
raise Exception(f'ERROR, the attribute checks is mandatory, but received: {kwargs}')
if isinstance(checking, str):
result = self.checks[checking](kwargs)
elif isinstance(checking, list):
result = [self.checks[x](kwargs) for x in checking]
result = " and\n".join(result)
else:
raise Exception(f"ERROR, checks type not supported: {checking}")
return result
if __name__ == "__main__":
data = Checks()
print(data.get_checks(checks='Check Response Status Code',
status_code=201))
print(data.get_checks(checks='Check Response Body Containing Array Of URIs set to'))
print(data.get_checks(checks='Check Created Resources Set To'))
print(data.get_checks(checks='Check Response Headers Containing Content-Type set to',
content_type='application/json'))
print(data.get_checks(checks='Check Response Headers Link Not Empty'))
print(data.get_checks(checks='Check Response Headers Containing URI set to'))
print(data.get_checks(checks='Check Response Headers ID Not Empty'))
print(data.get_checks(checks='Check Response Body Containing an Attribute set to',
attribute_name='status'))
print(data.get_checks(checks='Check Response Body Containing an Attribute set to',
attribute_name='status',
attribute_value='active'))
print(data.get_checks(checks='Check Response Body Containing Entity element'))
print(data.get_checks(checks='Check Response Body Containing List Containing Entity Elements'))
print(data.get_checks(checks='Check Response Body Containing List Containing Entity Elements With Different Types'))
print(data.get_checks(checks='Check Response Body Containing EntityTemporal element'))
print(data.get_checks(checks='Check Response Body Containing List Containing EntityTemporal elements',
timeRel='after',
timeAt='2020-07-01T12:05:00Z'))
print(data.get_checks(checks='Check Response Body Containing Subscription element'))
print(data.get_checks(checks='Check Response Body Containing List Containing Subscription elements',
number=2))
print(data.get_checks(checks='Check Response Body Containing List Containing Subscription elements',
number=1))
print(data.get_checks(checks='Check Response Body Containing Number Of Entities',
entity_type="Vehicle",
number_entities=3))
print(data.get_checks(checks='Check Response Body Containing Context Source Registration element',
csr_description='Context Source Registration'))
print(data.get_checks(checks='Check Response Body Containing EntityTypeList element',
description='Json object with list of entity types with context'))
print(data.get_checks(checks='Check Response Body Containing EntityType element',
description='Json object with an entity type with context'))
print(data.get_checks(checks='Check Response Body Containing EntityTypeInfo element'))
print(data.get_checks(checks='Check Response Body Containing AttributeList element'))
print(data.get_checks(checks='Check Response Body Containing Attribute element'))
print(data.get_checks(checks='Check Response Body Containing List Containing Context Source Registrations elements'))
print(data.get_checks(checks='Check Response Body Type When Using Session Request',
type='https://uri.etsi.org/ngsi-ld/errors/BadRequestData'))
print(data.get_checks(checks='Check Response Body Containing ProblemDetails Element Containing Type Element set to',
type='https://uri.etsi.org/ngsi-ld/errors/BadRequestData'))
print(data.get_checks(checks='Check Response Body Title When Using Session Request'))
print(data.get_checks(checks='Check Response Body Containing ProblemDetails Element Containing Title Element'))
print(data.get_checks(checks='Check RL Response Body Containing ProblemDetails Element Containing Type Element set to',
type='https://uri.etsi.org/ngsi-ld/errors/BadRequestData'))
print(data.get_checks(checks='Check RL Response Body Containing ProblemDetails Element Containing Title Element'))
print(data.get_checks(checks='Check JSON Value In Response Body',
key="['information']['entities'][0]['type']",
value="Building"))
print(data.get_checks(checks='Check Pagination Prev And Next Headers',
previous='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=1>;rel="prev";type="application/ld+json"',
next='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=3>;rel="next";type="application/ld+json"'))
print(data.get_checks(checks='Check Pagination Prev And Next Headers',
previous='',
next='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=3>;rel="next";type="application/ld+json"'))
print(data.get_checks(checks='Check Pagination Prev And Next Headers',
previous='</ngsi-ld/v1/csourceSubscriptions?limit=1&page=1>;rel="prev";type="application/ld+json"',
next=''))
print(data.get_checks(checks='Check Pagination Prev And Next Headers',
previous='',
next=''))
print(data.get_checks(checks='Check Created Resource Set To'))
print(data.get_checks(checks='Check Updated Resource Set To'))
print(data.get_checks(checks='Check Updated Resources Set To',
number_entities=2))
print(data.get_checks(checks='Check SUT Not Containing Resource',
status_code=404))
print(data.get_checks(checks='Check SUT Not Containing Resources'))
print(data.get_checks(checks='Check NotificationParams',
format="keyValues",
uri="http://my.endpoint.org/notify",
accept="application/json",
status="ok",
timesSent="1"))
print()
print(data.get_checks(checks=
['Check Response Status Code',
'Check Response Body Containing Array Of URIs set to',
'Check Created Resources Set To']
, status_code=201))
# Check exceptions
try:
print(data.get_checks(checks='Check Response Body Containing an Attribute set to'))
except Exception as e:
print(e)