Newer
Older
from doc.analysis.parserobotfile import ParseRobotFile
from doc.analysis.parseapiutilsfile import ParseApiUtilsFile
from doc.analysis.parsevariablesfile import ParseVariablesFile
from doc.analysis.initial_setup import InitialSetup
lopezaguilar
committed
from re import match, findall, finditer, sub, MULTILINE
class GenerateRobotData:
def __init__(self, robot_file: str, execdir: str):
self.robot_file = robot_file
self.execdir = execdir
self.suite = TestSuiteBuilder().build(robot_file)
self.config_variables = ParseVariablesFile()
self.robot = ParseRobotFile(filename=robot_file, execdir=execdir, config_file=self.config_variables)
self.apiutils = [ParseApiUtilsFile(filename=file) for file in self.robot.resource_files]
self.robot.set_apiutils(self.apiutils)
self.test_cases = list()
self.test_suite = dict()
self.tags_template = list()
self.documentation_template = str()
self.arguments = list()
self.args = list()
self.identifier = {
'ContextInformation': 'CI',
'CommonBehaviours': 'CB',
'Consumption': 'Cons',
'Provision': 'Prov',
'Discovery/RetrieveAvailableAttributeInformation': 'DISC',
'Discovery/RetrieveAvailableEntityTypeInformation': 'DISC',
'Discovery/RetrieveAvailableEntityTypes': 'DISC',
'Discovery/RetrieveDetailsOfAvailableEntityTypes': 'DISC',
'Discovery/RetrieveAvailableAttributes': 'DISC',
'Discovery/RetrieveDetailsOfAvailableAttributes': 'DISC',
'Entity/RetrieveEntity': 'E',
'Entities/CreateEntity': 'E',
'Entities/DeleteEntity': 'E',
'EntityAttributes/AppendEntityAttributes': 'EA',
'EntityAttributes/UpdateEntityAttributes': 'EA',
'EntityAttributes/PartialAttributeUpdate': 'EA',
'EntityAttributes/DeleteEntityAttribute': 'EA',
'BatchEntities/CreateBatchOfEntities': 'BE',
'BatchEntities/UpsertBatchOfEntities': 'BE',
'BatchEntities/DeleteBatchOfEntities': 'BE',
'TemporalEntity/QueryTemporalEvolutionOfEntities': 'TE',
'TemporalEntity/DeleteTemporalRepresentationOfEntity': 'TE',
'TemporalEntity/UpdateTemporalRepresentationOfEntity': 'TE',
'TemporalEntity/RetrieveTemporalEvolutionOfEntity': 'TE',
'TemporalEntity/CreateTemporalRepresentationOfEntity': 'TE',
'TemporalEntityAttributes/DeleteAttributeInstance': 'TEA',
'TemporalEntityAttributes/DeleteAttribute': 'TEA',
'TemporalEntityAttributes/PartialUpdateAttributeInstance': 'TEA',
'TemporalEntityAttributes/AddAttributes': 'TEA',
'Subscription/DeleteSubscription': 'SUB',
'Subscription/QuerySubscriptions': 'SUB',
'Subscription/RetrieveSubscription': 'SUB',
'Subscription/UpdateSubscription': 'SUB',
'Registration/CreateContextSourceRegistration': 'REG',
'Registration/CreateCSRegistration': 'REG',
'Registration/UpdateCSRegistration': 'REG',
'Registration/DeleteCSRegistration': 'REG',
'RegistrationSubscription/CreateCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/UpdateCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/RetrieveCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/QueryCSRegistrationSubscriptions': 'REGSUB',
'RegistrationSubscription/DeleteCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/CSRegistrationSubscriptionNotificationBehaviour': 'REGSUB',
'RegistrationSUBBehaviour': 'REGSUB',
'Discovery/RetrieveCSRegistration': 'DISC',
'Discovery/QueryCSRegistrations': 'DISC',
'CommonResponses/VerifyLdContextNotAvailable': 'HTTP',
'CommonResponses/VerifyMergePatchJson': 'HTTP',
'CommonResponses/VerifyGETWithoutAccept': 'HTTP',
'CommonResponses/VerifyUnsupportedMediaType': 'HTTP',
'CommonResponses/VerifyNotAcceptableMediaType': 'HTTP'
'v1.3.1': 'ETSI GS CIM 009 V1.3.1 []'
}
self.initial_conditions = {
'Setup Initial Entity':
'with {\n the SUT containing an initial Entity ${entity} with an id set to ${entityId}\n}',
'Initial State':
'with {\n the SUT in the "initial state"\n}'
}
self.headers = {
'${CONTENT_TYPE_LD_JSON}': 'Content-Type'
}
self.ids = {
'${ENTITIES_ENDPOINT_PATH}': '${entityId}'
}
self.base_TP_id = str()
lopezaguilar
committed
self.initial_setup = InitialSetup()
self.test_suite['robotpath'] = (self.robot_file.replace(f'{self.execdir}/TP/NGSI-LD/', '')
.replace(f'/{self.robot.test_suite}.robot', ''))
self.test_suite['robotfile'] = self.robot.test_suite
return self.test_suite
def parse_robot(self):
self.start_suite()
_ = [self.visit_test(test=x) for x in self.suite.tests]
_ = [self.get_step_data(test=x.name) for x in self.suite.tests]
self.test_suite['test_cases'] = self.test_cases
# Generate the permutation keys to provide the keys in test_cases list that are different
self.test_suite['permutations'] = self.get_permutation_keys(data=self.test_suite['test_cases'])
lopezaguilar
committed
# Generate the initial_condition of the test suite
self.test_suite['initial_condition'] = self.generate_initial_condition()
lopezaguilar
committed
aux = [x['setup'] for x in self.test_cases]
if all(element == aux[0] for element in aux[1:]):
aux = self.initial_setup.get_initial_condition(initial_condition=aux[0])
return aux
else:
print(f'Something went wrong, the test suite {self.suite.resource.source} '
f'has different "setup" values for its test cases.')
excluded_keys = ['doc', 'permutation_tp_id', 'setup', 'teardown', 'name', 'tags']
all_keys = [x for x in all_keys if x not in excluded_keys]
lopezaguilar
committed
keys_with_different_values = [
key for key in all_keys if any(d.get(key) != data[0].get(key) for d in data[1:])
]
lopezaguilar
committed
keys_with_different_values.sort()
def get_data_template(self, string: str) -> str:
if self.robot.test_template_name != '':
result = self.robot.string_test_template
else:
result = string
return result
def get_params(self, test_case: str):
test_case = self.get_data_template(string=test_case)
lines_starting_response = findall(r'^\s*\$\{response}.*|^\s*\$\{notification}.*', test_case, MULTILINE)
# If there is more than one line, it means that the test case has several operations, all of them to
# create the environment content to execute the last one, which is the correct one to test the Test Case
if (len(lines_starting_response) > 1 and
lopezaguilar
committed
any(map(lambda item: 'notification' in item, lines_starting_response)) is False):
# The last one corresponds to the execution of the test, the rest corresponds to the initial condition of
# test case...
response_to_check = lines_starting_response[-1]
else:
response_to_check = lines_starting_response[0]
index = test_case.find(response_to_check)
aux = test_case[index:].split('\n')
# Get the list of params of the function, they are the keys
if ' ... ' in aux[1]:
request = aux[0].split(' ')
request = [x for x in request if x != ''][1]
# We are in the case that the attributes are in following lines
for i in range(1, len(aux)):
if ' ... ' in aux[i]:
lopezaguilar
committed
param = match(pattern=regex, string=aux[i])
params.append(param.groups()[1])
else:
break
else:
# the attributes are in the same line
regex = r"\s*\$\{response\}=\s{4}(.*)"
lopezaguilar
committed
matches = finditer(regex, response_to_check, MULTILINE)
request = aux[0].split(' ')[2]
# We have two options from here, or the parameters are defined in the same line or the parameters are
# defined in following lines, next lines
lopezaguilar
committed
for a_match in matches:
lopezaguilar
committed
if len(a_match.groups()) == 1:
aux = a_match.group(1)
# Get the list of keys
params = aux.split(' ')[1:]
else:
raise Exception(f"Error, unexpected format, received: '{response_to_check}'")
return request, params
def get_step_data(self, test: str):
if self.robot.string_test_template == '':
string = self.robot.test_cases[test]
else:
string = self.robot.string_test_template
request, params = self.get_params(test_case=string)
verb, url, query_param = data.get_response(keyword=request)
index = None
for i, item in enumerate(self.test_cases):
if 'name' in item and item['name'] == test:
index = i
break
self.test_cases[index]['http_verb'] = verb
self.test_cases[index]['endpoint'] = self.get_values_url(keys=url,
query_param=query_param,
request=request,
params=params)
self.test_cases[index]['when'] = self.robot.generate_when_content(http_verb=self.test_cases[index]['http_verb'],
endpoint=self.test_cases[index]['endpoint'],
when=self.test_cases[index]['when'])
def check_header_parameters(self, params: list, test: str):
value = str()
# 1st case: value of the parameters are sent as it is
header_key = [x for x in params if x in self.headers.keys()]
if len(header_key) != 0:
key = header_key[0]
value = self.get_header_value(key=key)
else:
# 2nd case, maybe the params are defined in the way <param>=<value>
for k in self.headers:
aux = [x for x in params if k in x]
if len(aux) != 0:
key = aux[0].split('=')[1]
value = self.get_header_value(key=key)
else:
key = None
value = None
if key is not None and value is not None:
# Iterate over the list and find the index
index = None
for i, item in enumerate(self.test_cases):
if 'name' in item and item['name'] == test:
index = i
break
# self.test_cases[index]['params'] = params
self.test_cases[index][self.headers[key]] = value
def get_values_url(self, keys: list, query_param: bool, request: str, params: list) -> str:
data = [self.get_value_url(key=x, request=request, params=params) for x in keys]
if not query_param:
data = '/'.join(data).replace('//', '/').replace('?/', '?')
else:
aux = '/'.join(data[:-1]).replace('//', '/').replace('?/', '?')
data = f"{aux}?{data[-1]}"
def get_value_url(self, key: str, request: str, params: list) -> str:
key_to_search = f'${key}'
try:
flattened_list_variables = {k: v for d in self.apiutils for k, v in d.variables.items()}
value = flattened_list_variables[key_to_search]
except KeyError:
# It is not defined in ApiUtils, maybe in Robot File?
try:
value = self.robot.variables[key_to_search]
except KeyError:
# Maybe the url is defined in the proper resource file through an operation
try:
value = self.check_resource_for_url(string=key_to_search, request=request, params=params)
except KeyError:
# The variable is not defined, so it is keep as it is in the url
value = key
def check_resource_for_url(self, string: str, request: str, params: list) -> str:
data_file_contents = '\n'.join([x.file_contents for x in self.apiutils])
flattened_list_variables = {k: v for d in self.apiutils for k, v in d.variables.items()}
flattened_list_variables = {key.split(' ')[0]: value for key, value in flattened_list_variables.items()}
index1 = data_file_contents.find(string)
index2 = data_file_contents[index1:].find("\n")
line = data_file_contents[index1:index1 + index2]
if string in line:
if 'Get From Dictionary' in line:
# We have to obtain the information of the endpoint from the dictionary
aux = line.split("Get From Dictionary")[1].strip().split(" ")
key = aux[0]
value = aux[1]
if request == 'Batch Request Entities From File':
url = url_dict[params[0]]
else:
raise KeyError
return url
else:
raise KeyError
else:
raise KeyError
else:
raise KeyError
def get_header_value(self, key: str):
value = str()
# We can have a simple variable or a compound of two variables
count = key.count('$')
if count == 1:
# Get the value of the Header key
try:
except KeyError:
# It is not defined in ApiUtils, maybe in Robot File
try:
value = self.robot.variables[key]
except KeyError:
# ERROR, the header key is not defined
raise Exception(f"ERROR, the header key '{key}' is undefined")
elif count == 2:
keys = key.split("$")
key = f'${keys[1]}'
second_key = f'${keys[2]}'
try:
second_key = self.ids[key]
except KeyError:
raise Exception(f"ERROR: Need to manage the '{second_key}' in GenerateRobotData::self.ids")
value = f'{value}{second_key}'
except KeyError:
# It is not defined in ApiUtils, maybe in Robot File
try:
value = self.robot.variables[key]
value = f'{value}{second_key}'
except KeyError:
# ERROR, the header key is not defined
raise Exception(f"ERROR, the header key '{key}' is undefined")
return value
def start_suite(self):
"""Modify suite's tests to contain only every Xth."""
version = 'v1.3.1'
tp_id = self.generate_name()
reference, clauses = self.generate_reference(version=version)
# TODO: robotframework==7.0 has not property keywords in the class TestSuite(), we can get the data from
# [x.to_dict()['name'] for x in list(self.suite.resource.keywords)] but with some differences in execution code
self.test_suite = {
'tp_id': tp_id,
'test_objective': self.suite.doc,
'reference': reference,
lopezaguilar
committed
'config_id': str(),
'clauses': clauses,
'pics_selection': str(),
# 'keywords': [str(x) for x in self.suite.keywords],
'keywords': [x.to_dict()['name'] for x in list(self.suite.resource.keywords)],
lopezaguilar
committed
'initial_condition': str(),
'test_cases': list()
}
def visit_test(self, test):
# Get Tags associated to the test
if len(test.tags) == 0 and self.tags_template is not None:
tags = self.tags_template
else:
tags = list(test.tags)
# Get the Documentation associated to the test
if len(test.doc) == 0 and len(self.documentation_template) != 0:
documentation = self.documentation_template
else:
documentation = test.doc
# Get the Content-Type and Body associated to the Test
if len(self.args) != 0:
lopezaguilar
committed
# We are talking about Test Cases with Test Template, so we need to check the keyword content with the
# definition of the template
# Generate Checks for Test Data
then = self.robot.get_checks(test_name=test.template, apiutils=self.apiutils, name=test.name)
# Generate Request for Test Data
when = self.robot.get_request(test_name=test.template, name=test.name)
lopezaguilar
committed
# We are talking about a Test Cases without Test Template
# Generate Checks for Test Data
then = self.robot.get_checks(test_name=test.name, apiutils=self.apiutils, name=test.name)
when = self.robot.get_request(test_name=test.name, name=test.name)
test_case = {
'name': test.name,
'permutation_tp_id': f'{self.base_TP_id}/{test.name.split(" ")[0]}',
'setup': test.setup.name,
'teardown': test.teardown.name,
}
try:
self.test_suite['initial_condition'] = self.initial_conditions[test.setup.name]
string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
except KeyError:
self.test_suite['initial_condition'] = self.initial_conditions['Initial State']
string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
def get_body(self, string: str) -> str:
aux = string.split(' ')[1:]
if len(aux) >= 2:
aux = [self.get_body(x) for x in aux]
aux = ' '.join(aux)
elif len(aux) == 1:
lopezaguilar
committed
aux = sub(r'([a-z])([A-Z])', r'\1 \2', aux[0])
lopezaguilar
committed
aux = sub(r'([a-z])([A-Z])', r'\1 \2', string)
base_dir = dirname(dirname(dirname(__file__)))
tp_id = str(self.suite.source.parent).replace(f'{base_dir}/', "")
for key, value in self.identifier.items():
tp_id = tp_id.replace(key, value)
self.base_TP_id = tp_id
name = self.suite.name.replace(" ", "_")
tp_id = f'{self.base_TP_id}/{name}'
return tp_id
def generate_reference(self, version):
# Get the list of tags in the different tests
tags = [x.tags for x in self.suite.tests]
tags = [element for sublist in tags for element in sublist if element[0].isdigit()]
if len(tags) == 0:
# We have different tests cases that call a test template, maybe the Tags are defined in the template
reference, clauses = self.generate_reference_template(version=version)
if len(self.robot.test_template_name) == 0:
# We have normal tests cases
reference, clauses = self.generate_reference_testcases(tags=tags, version=version)
else:
# We have tests cases with information about tags but a template with information about documentation
reference, clauses = self.generate_reference_testcases(tags=tags, version=version)
_, _ = self.generate_reference_template(version=version, need_tags=False)
return reference, clauses
def generate_reference_template(self, version, need_tags=True):
# Get the list of arguments, we select the first one because the 2nd keyword corresponds
# to the teardown operation
args = [list(x.keywords)[0] for x in self.suite.tests]
args = [{str(x.parent): list(x.args)} for x in args]
self.args = dict()
_ = [self.args.update(x) for x in args]
template_name = list(set([list(x.keywords)[0].name for x in self.suite.tests]))[0]
# Due to the information of the tags are contained in the Keyword description of the template, we need to
# analyse the Keyword.
string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
reference, clauses = self.get_info_from_template(name=template_name,
string=string,
version=version,
need_tags=need_tags)
return reference, clauses
def get_info_from_template(self, name: str, string: str, version: str, need_tags: bool):
# TODO: Check that the name of the template is in the string receive
clauses = list()
tags = self.get_substring(string=string, key='[Tags]')
clauses = list(set([element.replace("_", ".")
for sublist in tags for element in tags if element[0].isdigit()]))
clauses.sort()
except IndexError:
raise Exception("ERROR, Probably [Tags] does not include reference to the section in the spec.")
if len(clauses) == 1:
reference = f'{self.references[version]}, clause {clauses[0]}'
elif len(clauses) > 1:
reference = f'{self.references[version]}, clauses {", ".join(clauses)}'
# clauses = f'PICS_{tag}'
self.arguments = self.get_substring(string=string, key='[Arguments]')
self.arguments = self.arguments[1:]
# Get the documentation
self.documentation_template = self.get_substring(string=string, key='[Documentation]')
self.documentation_template = self.documentation_template[1:][0]
return reference, clauses
@staticmethod
def get_substring(string: str, key: str):
pos1 = string.find(key) - 1
pos2 = string[pos1:].find('\n')
result = string[pos1:pos1+pos2].split(' ')
return result
def generate_reference_testcases(self, tags: list, version: str):
clauses = [x.replace("_", ".") for x in tags if match(pattern=r'^(\d+_\d+_\d+)|^(\d+_\d+)', string=x)]
clauses = list(set(clauses))
clauses.sort()
if len(clauses) == 0:
raise Exception(
f'ERROR: the Test Suite {self.suite.name} has different clauses or no clauses (Tags): {tags}\n'
f'Unable to select the corresponding Reference of this Test Suite')
elif len(clauses) == 1:
reference = f'{self.references[version]}, clause {clauses[0]}'
reference = f'{self.references[version]}, clauses {", ".join(clauses)}'
return reference, clauses