Newer
Older
from analysis.parserobotfile import ParseRobotFile
from analysis.parseapiutilsfile import ParseApiUtilsFile
from analysis.parsevariablesfile import ParseVariablesFile
from analysis.initial_setup import InitialSetup
lopezaguilar
committed
from re import match, findall, finditer, sub, MULTILINE
class GenerateRobotData:
def __init__(self, robot_file: str, execdir: str):
self.robot_file = robot_file
self.execdir = execdir
self.suite = TestSuiteBuilder().build(robot_file)
self.config_variables = ParseVariablesFile()
self.robot = ParseRobotFile(filename=robot_file, execdir=execdir, config_file=self.config_variables)
self.apiutils = [ParseApiUtilsFile(filename=file) for file in self.robot.resource_files]
self.robot.set_apiutils(self.apiutils)
self.test_cases = list()
self.test_suite = dict()
self.tags_template = list()
self.documentation_template = str()
self.arguments = list()
self.args = list()
self.identifier = {
'ContextInformation': 'CI',
'CommonBehaviours': 'CB',
'Consumption': 'Cons',
'Provision': 'Prov',
'Discovery/RetrieveAvailableAttributeInformation': 'DISC',
'Discovery/RetrieveAvailableEntityTypeInformation': 'DISC',
'Discovery/RetrieveAvailableEntityTypes': 'DISC',
'Discovery/RetrieveDetailsOfAvailableEntityTypes': 'DISC',
'Discovery/RetrieveAvailableAttributes': 'DISC',
'Discovery/RetrieveDetailsOfAvailableAttributes': 'DISC',
'Entity/RetrieveEntity': 'E',
'Entities/CreateEntity': 'E',
'Entities/DeleteEntity': 'E',
'EntityAttributes/AppendEntityAttributes': 'EA',
'EntityAttributes/UpdateEntityAttributes': 'EA',
'EntityAttributes/PartialAttributeUpdate': 'EA',
'EntityAttributes/DeleteEntityAttribute': 'EA',
'BatchEntities/CreateBatchOfEntities': 'BE',
'BatchEntities/UpsertBatchOfEntities': 'BE',
'BatchEntities/DeleteBatchOfEntities': 'BE',
'TemporalEntity/QueryTemporalEvolutionOfEntities': 'TE',
'TemporalEntity/DeleteTemporalRepresentationOfEntity': 'TE',
'TemporalEntity/UpdateTemporalRepresentationOfEntity': 'TE',
'TemporalEntity/RetrieveTemporalEvolutionOfEntity': 'TE',
'TemporalEntity/CreateTemporalRepresentationOfEntity': 'TE',
'TemporalEntityAttributes/DeleteAttributeInstance': 'TEA',
'TemporalEntityAttributes/DeleteAttribute': 'TEA',
'TemporalEntityAttributes/PartialUpdateAttributeInstance': 'TEA',
'TemporalEntityAttributes/AddAttributes': 'TEA',
'Subscription/DeleteSubscription': 'SUB',
'Subscription/QuerySubscriptions': 'SUB',
'Subscription/RetrieveSubscription': 'SUB',
'Subscription/UpdateSubscription': 'SUB',
'Registration/CreateContextSourceRegistration': 'REG',
'Registration/CreateCSRegistration': 'REG',
'Registration/UpdateCSRegistration': 'REG',
'Registration/DeleteCSRegistration': 'REG',
'RegistrationSubscription/CreateCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/UpdateCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/RetrieveCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/QueryCSRegistrationSubscriptions': 'REGSUB',
'RegistrationSubscription/DeleteCSRegistrationSubscription': 'REGSUB',
'RegistrationSubscription/CSRegistrationSubscriptionNotificationBehaviour': 'REGSUB',
'RegistrationSUBBehaviour': 'REGSUB',
'Discovery/RetrieveCSRegistration': 'DISC',
'Discovery/QueryCSRegistrations': 'DISC',
'CommonResponses/VerifyLdContextNotAvailable': 'HTTP',
'CommonResponses/VerifyMergePatchJson': 'HTTP',
'CommonResponses/VerifyGETWithoutAccept': 'HTTP',
'CommonResponses/VerifyUnsupportedMediaType': 'HTTP',
'CommonResponses/VerifyNotAcceptableMediaType': 'HTTP'
'v1.3.1': 'ETSI GS CIM 009 V1.3.1 []'
}
self.initial_conditions = {
'Setup Initial Entity':
'with {\n the SUT containing an initial Entity ${entity} with an id set to ${entityId}\n}',
'Initial State':
'with {\n the SUT in the "initial state"\n}'
}
self.headers = {
'${CONTENT_TYPE_LD_JSON}': 'Content-Type'
}
self.ids = {
'${ENTITIES_ENDPOINT_PATH}': '${entityId}'
}
self.base_TP_id = str()
lopezaguilar
committed
self.initial_setup = InitialSetup()
self.test_suite['robotpath'] = (self.robot_file.replace(f'{self.execdir}/TP/NGSI-LD/', '')
.replace(f'/{self.robot.test_suite}.robot', ''))
self.test_suite['robotfile'] = self.robot.test_suite
return self.test_suite
def parse_robot(self):
self.start_suite()
_ = [self.visit_test(test=x) for x in self.suite.tests]
_ = [self.get_step_data(test=x.name) for x in self.suite.tests]
self.test_suite['test_cases'] = self.test_cases
# Generate the permutation keys to provide the keys in test_cases list that are different
self.test_suite['permutations'] = self.get_permutation_keys(data=self.test_suite['test_cases'])
lopezaguilar
committed
# Generate the initial_condition of the test suite
self.test_suite['initial_condition'] = self.generate_initial_condition()
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Generate the parent release and correct the reference in case that the tags include information since_v1.x.y
self.get_version()
def get_version(self):
data = [x['tags'] for x in self.test_suite['test_cases']]
aux = [item for sublist in data for item in sublist if item.startswith('since')]
if len(aux) == 0:
has_since = False
else:
has_since = True
are_all_same = all(item == aux[0] for item in aux)
if are_all_same and has_since:
aux = aux[0]
aux = aux.split('since_')[1]
else:
if has_since:
print('Error not all the tags have the same version. Select default v1.3.1')
aux = 'v1.3.1'
if 'clauses' in self.test_suite['reference']:
self.test_suite['reference'] = \
f"ETSI GS CIM 009 V{aux.split('v')[1]} [], clauses {self.test_suite['reference'].split('clauses ')[1]}"
else:
self.test_suite['reference'] = \
f"ETSI GS CIM 009 V{aux.split('v')[1]} [], clause {self.test_suite['reference'].split('clause ')[1]}"
self.test_suite['parent_release'] = aux
lopezaguilar
committed
aux = [x['setup'] for x in self.test_cases]
if all(element == aux[0] for element in aux[1:]):
aux = self.initial_setup.get_initial_condition(initial_condition=aux[0])
return aux
else:
print(f'Something went wrong, the test suite {self.suite.resource.source} '
f'has different "setup" values for its test cases.')
excluded_keys = ['doc', 'permutation_tp_id', 'setup', 'teardown', 'name', 'tags']
all_keys = [x for x in all_keys if x not in excluded_keys]
lopezaguilar
committed
keys_with_different_values = [
key for key in all_keys if any(d.get(key) != data[0].get(key) for d in data[1:])
]
lopezaguilar
committed
keys_with_different_values.sort()
def get_data_template(self, string: str) -> str:
if self.robot.test_template_name != '':
result = self.robot.string_test_template
else:
result = string
return result
def get_params(self, test_case: str):
test_case = self.get_data_template(string=test_case)
lines_starting_response = findall(r'^\s*\$\{response}.*|^\s*\$\{notification}.*', test_case, MULTILINE)
# If there is more than one line, it means that the test case has several operations, all of them to
# create the environment content to execute the last one, which is the correct one to test the Test Case
if (len(lines_starting_response) > 1 and
lopezaguilar
committed
any(map(lambda item: 'notification' in item, lines_starting_response)) is False):
# The last one corresponds to the execution of the test, the rest corresponds to the initial condition of
# test case...
response_to_check = lines_starting_response[-1]
else:
response_to_check = lines_starting_response[0]
index = test_case.find(response_to_check)
aux = test_case[index:].split('\n')
# Get the list of params of the function, they are the keys
if ' ... ' in aux[1]:
request = aux[0].split(' ')
request = [x for x in request if x != ''][1]
# We are in the case that the attributes are in following lines
for i in range(1, len(aux)):
if ' ... ' in aux[i]:
lopezaguilar
committed
param = match(pattern=regex, string=aux[i])
params.append(param.groups()[1])
else:
break
else:
# the attributes are in the same line
regex = r"\s*\$\{response\}=\s{4}(.*)"
lopezaguilar
committed
matches = finditer(regex, response_to_check, MULTILINE)
request = aux[0].split(' ')[2]
# We have two options from here, or the parameters are defined in the same line or the parameters are
# defined in following lines, next lines
lopezaguilar
committed
for a_match in matches:
lopezaguilar
committed
if len(a_match.groups()) == 1:
aux = a_match.group(1)
# Get the list of keys
params = aux.split(' ')[1:]
else:
raise Exception(f"Error, unexpected format, received: '{response_to_check}'")
return request, params
def get_step_data(self, test: str):
if self.robot.string_test_template == '':
string = self.robot.test_cases[test]
else:
string = self.robot.string_test_template
request, params = self.get_params(test_case=string)
verb, url, query_param = data.get_response(keyword=request)
index = None
for i, item in enumerate(self.test_cases):
if 'name' in item and item['name'] == test:
index = i
break
self.test_cases[index]['http_verb'] = verb
self.test_cases[index]['endpoint'] = self.get_values_url(keys=url,
query_param=query_param,
request=request,
params=params)
self.test_cases[index]['when'] = self.robot.generate_when_content(http_verb=self.test_cases[index]['http_verb'],
endpoint=self.test_cases[index]['endpoint'],
when=self.test_cases[index]['when'])
def check_header_parameters(self, params: list, test: str):
value = str()
# 1st case: value of the parameters are sent as it is
header_key = [x for x in params if x in self.headers.keys()]
if len(header_key) != 0:
key = header_key[0]
value = self.get_header_value(key=key)
else:
# 2nd case, maybe the params are defined in the way <param>=<value>
for k in self.headers:
aux = [x for x in params if k in x]
if len(aux) != 0:
key = aux[0].split('=')[1]
value = self.get_header_value(key=key)
else:
key = None
value = None
if key is not None and value is not None:
# Iterate over the list and find the index
index = None
for i, item in enumerate(self.test_cases):
if 'name' in item and item['name'] == test:
index = i
break
# self.test_cases[index]['params'] = params
self.test_cases[index][self.headers[key]] = value
def get_values_url(self, keys: list, query_param: bool, request: str, params: list) -> str:
data = [self.get_value_url(key=x, request=request, params=params) for x in keys]
if not query_param:
data = '/'.join(data).replace('//', '/').replace('?/', '?')
else:
aux = '/'.join(data[:-1]).replace('//', '/').replace('?/', '?')
data = f"{aux}?{data[-1]}"
def get_value_url(self, key: str, request: str, params: list) -> str:
key_to_search = f'${key}'
try:
flattened_list_variables = {k: v for d in self.apiutils for k, v in d.variables.items()}
value = flattened_list_variables[key_to_search]
except KeyError:
# It is not defined in ApiUtils, maybe in Robot File?
try:
value = self.robot.variables[key_to_search]
except KeyError:
# Maybe the url is defined in the proper resource file through an operation
try:
value = self.check_resource_for_url(string=key_to_search, request=request, params=params)
except KeyError:
# The variable is not defined, so it is keep as it is in the url
value = key
def check_resource_for_url(self, string: str, request: str, params: list) -> str:
data_file_contents = '\n'.join([x.file_contents for x in self.apiutils])
flattened_list_variables = {k: v for d in self.apiutils for k, v in d.variables.items()}
flattened_list_variables = {key.split(' ')[0]: value for key, value in flattened_list_variables.items()}
index1 = data_file_contents.find(string)
index2 = data_file_contents[index1:].find("\n")
line = data_file_contents[index1:index1 + index2]
if string in line:
if 'Get From Dictionary' in line:
# We have to obtain the information of the endpoint from the dictionary
aux = line.split("Get From Dictionary")[1].strip().split(" ")
key = aux[0]
value = aux[1]
if request == 'Batch Request Entities From File':
url = url_dict[params[0]]
else:
raise KeyError
return url
else:
raise KeyError
else:
raise KeyError
else:
raise KeyError
def get_header_value(self, key: str):
value = str()
# We can have a simple variable or a compound of two variables
count = key.count('$')
if count == 1:
# Get the value of the Header key
try:
except KeyError:
# It is not defined in ApiUtils, maybe in Robot File
try:
value = self.robot.variables[key]
except KeyError:
# ERROR, the header key is not defined
raise Exception(f"ERROR, the header key '{key}' is undefined")
elif count == 2:
keys = key.split("$")
key = f'${keys[1]}'
second_key = f'${keys[2]}'
try:
second_key = self.ids[key]
except KeyError:
raise Exception(f"ERROR: Need to manage the '{second_key}' in GenerateRobotData::self.ids")
value = f'{value}{second_key}'
except KeyError:
# It is not defined in ApiUtils, maybe in Robot File
try:
value = self.robot.variables[key]
value = f'{value}{second_key}'
except KeyError:
# ERROR, the header key is not defined
raise Exception(f"ERROR, the header key '{key}' is undefined")
return value
def start_suite(self):
"""Modify suite's tests to contain only every Xth."""
version = 'v1.3.1'
tp_id = self.generate_name()
reference, clauses = self.generate_reference(version=version)
# TODO: robotframework==7.0 has not property keywords in the class TestSuite(), we can get the data from
# [x.to_dict()['name'] for x in list(self.suite.resource.keywords)] but with some differences in execution code
self.test_suite = {
'tp_id': tp_id,
'test_objective': self.suite.doc,
'reference': reference,
lopezaguilar
committed
'config_id': str(),
'clauses': clauses,
'pics_selection': str(),
# 'keywords': [str(x) for x in self.suite.keywords],
'keywords': [x.to_dict()['name'] for x in list(self.suite.resource.keywords)],
lopezaguilar
committed
'initial_condition': str(),
'test_cases': list()
}
def visit_test(self, test):
# Get Tags associated to the test
if len(test.tags) == 0 and self.tags_template is not None:
tags = self.tags_template
else:
tags = list(test.tags)
# Get the Documentation associated to the test
if len(test.doc) == 0 and len(self.documentation_template) != 0:
documentation = self.documentation_template
else:
documentation = test.doc
# Get the Content-Type and Body associated to the Test
if len(self.args) != 0:
lopezaguilar
committed
# We are talking about Test Cases with Test Template, so we need to check the keyword content with the
# definition of the template
# Generate Checks for Test Data
then = self.robot.get_checks(test_name=test.template, apiutils=self.apiutils, name=test.name)
# Generate Request for Test Data
when = self.robot.get_request(test_name=test.template, name=test.name)
lopezaguilar
committed
# We are talking about a Test Cases without Test Template
# Generate Checks for Test Data
then = self.robot.get_checks(test_name=test.name, apiutils=self.apiutils, name=test.name)
when = self.robot.get_request(test_name=test.name, name=test.name)
test_case = {
'name': test.name,
'permutation_tp_id': f'{self.base_TP_id}/{test.name.split(" ")[0]}',
'setup': test.setup.name,
'teardown': test.teardown.name,
}
try:
self.test_suite['initial_condition'] = self.initial_conditions[test.setup.name]
string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
except KeyError:
self.test_suite['initial_condition'] = self.initial_conditions['Initial State']
string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
def get_body(self, string: str) -> str:
aux = string.split(' ')[1:]
if len(aux) >= 2:
aux = [self.get_body(x) for x in aux]
aux = ' '.join(aux)
elif len(aux) == 1:
lopezaguilar
committed
aux = sub(r'([a-z])([A-Z])', r'\1 \2', aux[0])
lopezaguilar
committed
aux = sub(r'([a-z])([A-Z])', r'\1 \2', string)
base_dir = dirname(dirname(dirname(__file__)))
tp_id = str(self.suite.source.parent).replace(f'{base_dir}/', "")
for key, value in self.identifier.items():
tp_id = tp_id.replace(key, value)
self.base_TP_id = tp_id
name = self.suite.name.replace(" ", "_")
tp_id = f'{self.base_TP_id}/{name}'
return tp_id
def generate_reference(self, version):
# Get the list of tags in the different tests
tags = [x.tags for x in self.suite.tests]
tags = [element for sublist in tags for element in sublist if element[0].isdigit()]
if len(tags) == 0:
# We have different tests cases that call a test template, maybe the Tags are defined in the template
reference, clauses = self.generate_reference_template(version=version)
if len(self.robot.test_template_name) == 0:
# We have normal tests cases
reference, clauses = self.generate_reference_testcases(tags=tags, version=version)
else:
# We have tests cases with information about tags but a template with information about documentation
reference, clauses = self.generate_reference_testcases(tags=tags, version=version)
_, _ = self.generate_reference_template(version=version, need_tags=False)
return reference, clauses
def generate_reference_template(self, version, need_tags=True):
# Get the list of arguments, we select the first one because the 2nd keyword corresponds
# to the teardown operation
args = [list(x.keywords)[0] for x in self.suite.tests]
args = [{str(x.parent): list(x.args)} for x in args]
self.args = dict()
_ = [self.args.update(x) for x in args]
template_name = list(set([list(x.keywords)[0].name for x in self.suite.tests]))[0]
# Due to the information of the tags are contained in the Keyword description of the template, we need to
# analyse the Keyword.
string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
reference, clauses = self.get_info_from_template(name=template_name,
string=string,
version=version,
need_tags=need_tags)
return reference, clauses
def get_info_from_template(self, name: str, string: str, version: str, need_tags: bool):
# TODO: Check that the name of the template is in the string receive
clauses = list()
tags = self.get_substring(string=string, key='[Tags]')
clauses = list(set([element.replace("_", ".")
for sublist in tags for element in tags if element[0].isdigit()]))
clauses.sort()
except IndexError:
raise Exception("ERROR, Probably [Tags] does not include reference to the section in the spec.")
if len(clauses) == 1:
reference = f'{self.references[version]}, clause {clauses[0]}'
elif len(clauses) > 1:
reference = f'{self.references[version]}, clauses {", ".join(clauses)}'
# clauses = f'PICS_{tag}'
self.arguments = self.get_substring(string=string, key='[Arguments]')
self.arguments = self.arguments[1:]
# Get the documentation
self.documentation_template = self.get_substring(string=string, key='[Documentation]')
self.documentation_template = self.documentation_template[1:][0]
return reference, clauses
@staticmethod
def get_substring(string: str, key: str):
pos1 = string.find(key) - 1
pos2 = string[pos1:].find('\n')
result = string[pos1:pos1+pos2].split(' ')
return result
def generate_reference_testcases(self, tags: list, version: str):
clauses = [x.replace("_", ".") for x in tags if match(pattern=r'^(\d+_\d+_\d+)|^(\d+_\d+)', string=x)]
clauses = list(set(clauses))
clauses.sort()
if len(clauses) == 0:
raise Exception(
f'ERROR: the Test Suite {self.suite.name} has different clauses or no clauses (Tags): {tags}\n'
f'Unable to select the corresponding Reference of this Test Suite')
elif len(clauses) == 1:
reference = f'{self.references[version]}, clause {clauses[0]}'
reference = f'{self.references[version]}, clauses {", ".join(clauses)}'
return reference, clauses