Skip to content
generaterobotdata.py 24.1 KiB
Newer Older
from os.path import dirname
from robot.api import TestSuiteBuilder
from analysis.parserobotfile import ParseRobotFile
from analysis.parseapiutilsfile import ParseApiUtilsFile
from analysis.parsevariablesfile import ParseVariablesFile
from analysis.initial_setup import InitialSetup
from re import match, findall, finditer, sub, MULTILINE


class GenerateRobotData:
    def __init__(self, robot_file: str, execdir: str):
        self.robot_file = robot_file
        self.execdir = execdir
        self.suite = TestSuiteBuilder().build(robot_file)
        self.config_variables = ParseVariablesFile()
        self.robot = ParseRobotFile(filename=robot_file, execdir=execdir, config_file=self.config_variables)
        # TODO: the robot file can provide several ApiUtils not only one
lopezaguilar's avatar
lopezaguilar committed
        self.apiutils = [ParseApiUtilsFile(filename=file) for file in self.robot.resource_files]
        self.robot.set_apiutils(self.apiutils)
        self.test_cases = list()
        self.test_suite = dict()
        self.tags_template = list()
        self.documentation_template = str()
        self.arguments = list()
        self.args = list()
        self.identifier = {
            'ContextInformation': 'CI',
            'CommonBehaviours': 'CB',
            'Consumption': 'Cons',
            'Provision': 'Prov',
lopezaguilar's avatar
lopezaguilar committed
            'ContextSource': 'CS',
            'Discovery/RetrieveAvailableAttributeInformation': 'DISC',
            'Discovery/RetrieveAvailableEntityTypeInformation': 'DISC',
            'Discovery/RetrieveAvailableEntityTypes': 'DISC',
            'Discovery/RetrieveDetailsOfAvailableEntityTypes': 'DISC',
            'Discovery/RetrieveAvailableAttributes': 'DISC',
            'Discovery/RetrieveDetailsOfAvailableAttributes': 'DISC',
            'Entity/RetrieveEntity': 'E',
            'Entities/CreateEntity': 'E',
lopezaguilar's avatar
lopezaguilar committed
            'Entity/QueryEntities': 'E',
lopezaguilar's avatar
lopezaguilar committed
            'Entities/DeleteEntity': 'E',
            'EntityAttributes/AppendEntityAttributes': 'EA',
            'EntityAttributes/UpdateEntityAttributes': 'EA',
            'EntityAttributes/PartialAttributeUpdate': 'EA',
            'EntityAttributes/DeleteEntityAttribute': 'EA',
lopezaguilar's avatar
lopezaguilar committed
            'BatchEntities/CreateBatchOfEntities': 'BE',
            'BatchEntities/UpsertBatchOfEntities': 'BE',
lopezaguilar's avatar
lopezaguilar committed
            'BatchEntities/UpdateBatchOfEntities': 'BE',
            'BatchEntities/DeleteBatchOfEntities': 'BE',
lopezaguilar's avatar
lopezaguilar committed
            'TemporalEntity/QueryTemporalEvolutionOfEntities': 'TE',
            'TemporalEntity/DeleteTemporalRepresentationOfEntity': 'TE',
            'TemporalEntity/UpdateTemporalRepresentationOfEntity': 'TE',
            'TemporalEntity/RetrieveTemporalEvolutionOfEntity': 'TE',
            'TemporalEntity/CreateTemporalRepresentationOfEntity': 'TE',
lopezaguilar's avatar
lopezaguilar committed
            'TemporalEntityAttributes/DeleteAttributeInstance': 'TEA',
            'TemporalEntityAttributes/DeleteAttribute': 'TEA',
lopezaguilar's avatar
lopezaguilar committed
            'TemporalEntityAttributes/PartialUpdateAttributeInstance': 'TEA',
            'TemporalEntityAttributes/AddAttributes': 'TEA',
lopezaguilar's avatar
lopezaguilar committed
            'Subscription/CreateSubscription': 'SUB',
lopezaguilar's avatar
lopezaguilar committed
            'Subscription/DeleteSubscription': 'SUB',
            'Subscription/QuerySubscriptions': 'SUB',
            'Subscription/RetrieveSubscription': 'SUB',
            'Subscription/UpdateSubscription': 'SUB',
lopezaguilar's avatar
lopezaguilar committed
            'Subscription/SubscriptionNotificationBehaviour': 'SUB',
lopezaguilar's avatar
lopezaguilar committed
            'Registration/CreateContextSourceRegistration': 'REG',
            'Registration/CreateCSRegistration': 'REG',
            'Registration/UpdateCSRegistration': 'REG',
            'Registration/DeleteCSRegistration': 'REG',
lopezaguilar's avatar
lopezaguilar committed
            'Registration/RegisterCS': 'CSR',
lopezaguilar's avatar
lopezaguilar committed
            'RegistrationSubscription/CreateCSRegistrationSubscription': 'REGSUB',
            'RegistrationSubscription/UpdateCSRegistrationSubscription': 'REGSUB',
            'RegistrationSubscription/RetrieveCSRegistrationSubscription': 'REGSUB',
            'RegistrationSubscription/QueryCSRegistrationSubscriptions': 'REGSUB',
            'RegistrationSubscription/DeleteCSRegistrationSubscription': 'REGSUB',
lopezaguilar's avatar
lopezaguilar committed
            'RegistrationSubscription/CSRegistrationSubscriptionNotificationBehaviour': 'REGSUB',
            'RegistrationSUBBehaviour': 'REGSUB',
            'Discovery/RetrieveCSRegistration': 'DISC',
            'Discovery/QueryCSRegistrations': 'DISC',
            'CommonResponses/VerifyLdContextNotAvailable': 'HTTP',
            'CommonResponses/VerifyMergePatchJson': 'HTTP',
            'CommonResponses/VerifyGETWithoutAccept': 'HTTP',
            'CommonResponses/VerifyUnsupportedMediaType': 'HTTP',
            'CommonResponses/VerifyNotAcceptableMediaType': 'HTTP'
        }
        self.references = {
            'v1.3.1': 'ETSI GS CIM 009 V1.3.1 []'
        }
        self.initial_conditions = {
            'Setup Initial Entity':
                'with {\n    the SUT containing an initial Entity ${entity} with an id set to ${entityId}\n}',
            'Initial State':
                'with {\n    the SUT in the "initial state"\n}'
        }
        self.headers = {
            '${CONTENT_TYPE_LD_JSON}': 'Content-Type'
        }
        self.ids = {
            '${ENTITIES_ENDPOINT_PATH}': '${entityId}'
        }
        self.base_TP_id = str()

    def get_info(self):
        self.test_suite['robotpath'] = (self.robot_file.replace(f'{self.execdir}/TP/NGSI-LD/', '')
                                        .replace(f'/{self.robot.test_suite}.robot', ''))
        self.test_suite['robotfile'] = self.robot.test_suite
        return self.test_suite

    def parse_robot(self):
        self.start_suite()
        _ = [self.visit_test(test=x) for x in self.suite.tests]

        _ = [self.get_step_data(test=x.name) for x in self.suite.tests]
        self.test_suite['test_cases'] = self.test_cases

        # Generate the permutation keys to provide the keys in test_cases list that are different
lopezaguilar's avatar
lopezaguilar committed
        self.test_suite['permutations'] = self.get_permutation_keys(data=self.test_suite['test_cases'])

        # Generate the initial_condition of the test suite
        self.test_suite['initial_condition'] = self.generate_initial_condition()

    def generate_initial_condition(self) -> str :
        aux = [x['setup'] for x in self.test_cases]
        if all(element == aux[0] for element in aux[1:]):
            aux = self.initial_setup.get_initial_condition(initial_condition=aux[0])
            return aux
        else:
            print(f'Something went wrong, the test suite {self.suite.resource.source} '
                  f'has different "setup" values for its test cases.')

lopezaguilar's avatar
lopezaguilar committed
    def get_permutation_keys(self, data):
        all_keys = set().union(*data)
        excluded_keys = ['doc', 'permutation_tp_id', 'setup', 'teardown', 'name', 'tags']
lopezaguilar's avatar
lopezaguilar committed
        all_keys = [x for x in all_keys if x not in excluded_keys]
lopezaguilar's avatar
lopezaguilar committed
        keys_with_different_values = [
            key for key in all_keys if any(d.get(key) != data[0].get(key) for d in data[1:])
        ]

lopezaguilar's avatar
lopezaguilar committed
        return keys_with_different_values

lopezaguilar's avatar
lopezaguilar committed
    def get_data_template(self, string: str) -> str:
        if self.robot.test_template_name != '':
            result = self.robot.string_test_template
        else:
            result = string

        return result

    def get_params(self, test_case: str):
lopezaguilar's avatar
lopezaguilar committed
        test_case = self.get_data_template(string=test_case)
        lines_starting_response = findall(r'^\s*\$\{response\}.*|^\s*\$\{notification\}.*', test_case, MULTILINE)

        # If there is more than one line, it means that the test case has several operations, all of them to
        # create the environment content to execute the last one, which is the correct one to test the Test Case
        if (len(lines_starting_response) > 1 and
                any(map(lambda item: 'notification' in item, lines_starting_response)) is False):
            # The last one corresponds to the execution of the test, the rest corresponds to the initial condition of
            # test case...
            response_to_check = lines_starting_response[-1]
        else:
            response_to_check = lines_starting_response[0]

        index = test_case.find(response_to_check)
        aux = test_case[index:].split('\n')
lopezaguilar's avatar
lopezaguilar committed
        aux = [x for x in aux if x != '']
        params = list()
        request = str()
lopezaguilar's avatar
lopezaguilar committed

        # Get the list of params of the function, they are the keys
        if '    ...    ' in aux[1]:
            request = aux[0].split('    ')
            request = [x for x in request if x != ''][1]

            # We are in the case that the attributes are in following lines
            for i in range(1, len(aux)):
                if '    ...    ' in aux[i]:
                    regex = '(\s{4})*\s{4}\.{3}\s{4}(.*)'
                    if aux:
                        params.append(param.groups()[1])
                else:
                    break
        else:
            # the attributes are in the same line
            regex = r"\s*\$\{response\}=\s{4}(.*)"
            matches = finditer(regex, response_to_check, MULTILINE)
            request = aux[0].split('    ')[2]
            # We have two options from here, or the parameters are defined in the same line or the parameters are
            # defined in following lines, next lines
                # Check that we have 1 group matched
                if len(a_match.groups()) == 1:
                    aux = a_match.group(1)

                    # Get the list of keys
                    params = aux.split('    ')[1:]
                else:
                    raise Exception(f"Error, unexpected format, received: '{response_to_check}'")

        return request, params

    def get_step_data(self, test: str):
        if self.robot.string_test_template == '':
            string = self.robot.test_cases[test]
        else:
            string = self.robot.string_test_template
        request, params = self.get_params(test_case=string)
lopezaguilar's avatar
lopezaguilar committed

        for data in self.apiutils:
            verb, url, query_param = data.get_response(keyword=request)
lopezaguilar's avatar
lopezaguilar committed
            if verb != '':
                break

        index = None
        for i, item in enumerate(self.test_cases):
            if 'name' in item and item['name'] == test:
                index = i
                break

        self.test_cases[index]['http_verb'] = verb
        self.test_cases[index]['endpoint'] = self.get_values_url(keys=url,
                                                                 query_param=query_param,
                                                                 request=request,
                                                                 params=params)

        self.test_cases[index]['when'] = self.robot.generate_when_content(http_verb=self.test_cases[index]['http_verb'],
                                                                          endpoint=self.test_cases[index]['endpoint'],
                                                                          when=self.test_cases[index]['when'])

    def check_header_parameters(self, params: list, test: str):
        value = str()

        # 1st case: value of the parameters are sent as it is
        header_key = [x for x in params if x in self.headers.keys()]
        if len(header_key) != 0:
            key = header_key[0]
            value = self.get_header_value(key=key)
        else:
            # 2nd case, maybe the params are defined in the way <param>=<value>
            for k in self.headers:
                aux = [x for x in params if k in x]

            if len(aux) != 0:
                key = aux[0].split('=')[1]
                value = self.get_header_value(key=key)
            else:
                key = None
                value = None

        if key is not None and value is not None:
            # Iterate over the list and find the index
            index = None
            for i, item in enumerate(self.test_cases):
                if 'name' in item and item['name'] == test:
                    index = i
                    break

            # self.test_cases[index]['params'] = params
            self.test_cases[index][self.headers[key]] = value

    def get_values_url(self, keys: list, query_param: bool, request: str, params: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        data = [self.get_value_url(key=x, request=request, params=params) for x in keys]
            data = '/'.join(data).replace('//', '/').replace('?/','?')
        else:
            aux = '/'.join(data[:-1]).replace('//', '/').replace('?/','?')
            data = f"{aux}?{data[-1]}"

lopezaguilar's avatar
lopezaguilar committed
    def get_value_url(self, key: str, request: str, params: list) -> str:
        key_to_search = f'${key}'
        try:
lopezaguilar's avatar
lopezaguilar committed
            flattened_list_variables = {k: v for d in self.apiutils for k, v in d.variables.items()}
            value = flattened_list_variables[key_to_search]
        except KeyError:
            # It is not defined in ApiUtils, maybe in Robot File?
            try:
                value = self.robot.variables[key_to_search]
            except KeyError:
lopezaguilar's avatar
lopezaguilar committed
                # Maybe the url is defined in the proper resource file through an operation
                try:
                    value = self.check_resource_for_url(string=key_to_search, request=request, params=params)
                except KeyError:
                    # The variable is not defined, so it is keep as it is in the url
                    value = key
lopezaguilar's avatar
lopezaguilar committed
    def check_resource_for_url(self, string: str, request: str, params: list) -> str:
lopezaguilar's avatar
lopezaguilar committed
        data_file_contents = '\n'.join([x.file_contents for x in self.apiutils])
        flattened_list_variables = {k: v for d in self.apiutils for k, v in d.variables.items()}
        flattened_list_variables = {key.split(' ')[0]: value for key, value in flattened_list_variables.items()}

        index1 = data_file_contents.find(string)
lopezaguilar's avatar
lopezaguilar committed

        if index1 != -1:
lopezaguilar's avatar
lopezaguilar committed
            index2 = data_file_contents[index1:].find("\n")
            line = data_file_contents[index1:index1 + index2]
lopezaguilar's avatar
lopezaguilar committed

            if string in line:
                if 'Get From Dictionary' in line:
                    # We have to obtain the information of the endpoint from the dictionary
                    aux = line.split("Get From Dictionary")[1].strip().split("    ")
                    key = aux[0]
                    value = aux[1]
lopezaguilar's avatar
lopezaguilar committed
                    url_dict = flattened_list_variables[key]
lopezaguilar's avatar
lopezaguilar committed

                    if request == 'Batch Request Entities From File':
                        url = url_dict[params[0]]
                    else:
                        raise KeyError

                    return url
                else:
                    raise KeyError
            else:
                raise KeyError
        else:
            raise KeyError

    def get_header_value(self, key: str):
        value = str()
        # We can have a simple variable or a compound of two variables
        count = key.count('$')
        if count == 1:
            # Get the value of the Header key
            try:
lopezaguilar's avatar
lopezaguilar committed
                value = self.apiutils.variables[key]
            except KeyError:
                # It is not defined in ApiUtils, maybe in Robot File
                try:
                    value = self.robot.variables[key]
                except KeyError:
                    # ERROR, the header key is not defined
lopezaguilar's avatar
lopezaguilar committed
                    raise Exception(f"ERROR, the header key '{key}' is undefined")
        elif count == 2:
            keys = key.split("$")
            key = f'${keys[1]}'
            second_key = f'${keys[2]}'

            try:
                second_key = self.ids[key]
            except KeyError:
lopezaguilar's avatar
lopezaguilar committed
                raise Exception(f"ERROR: Need to manage the '{second_key}' in GenerateRobotData::self.ids")
            # Get the value of the Header key
            try:
lopezaguilar's avatar
lopezaguilar committed
                value = self.apiutils.variables[key]
                value = f'{value}{second_key}'
            except KeyError:
                # It is not defined in ApiUtils, maybe in Robot File
                try:
                    value = self.robot.variables[key]
                    value = f'{value}{second_key}'
                except KeyError:
                    # ERROR, the header key is not defined
lopezaguilar's avatar
lopezaguilar committed
                    raise Exception(f"ERROR, the header key '{key}' is undefined")

        return value

    def start_suite(self):
        """Modify suite's tests to contain only every Xth."""
        version = 'v1.3.1'
        tp_id = self.generate_name()
        reference, clauses = self.generate_reference(version=version)

        self.test_suite = {
            'tp_id': tp_id,
            'test_objective': self.suite.doc,
            'reference': reference,
            'parent_release': version,
            'clauses': clauses,
            'pics_selection': str(),
            'keywords': [str(x) for x in self.suite.keywords],
lopezaguilar's avatar
lopezaguilar committed
            'teardown': str(self.suite.teardown),
            'test_cases': list()
        }

    def visit_test(self, test):
        # Get Tags associated to the test
        if len(test.tags) == 0 and self.tags_template is not None:
            tags = self.tags_template
        else:
            tags = list(test.tags)

        # Get the Documentation associated to the test
lopezaguilar's avatar
lopezaguilar committed
        if len(test.doc) == 0 and len(self.documentation_template) != 0:
            documentation = self.documentation_template
        else:
            documentation = test.doc

        # Get the Content-Type and Body associated to the Test
        if len(self.args) != 0:
            # We are talking about Test Cases with Test Template, so we need to check the keyword content with the
            # definition of the template

            # Generate Checks for Test Data
lopezaguilar's avatar
lopezaguilar committed
            then = self.robot.get_checks(test_name=test.template, apiutils=self.apiutils, name=test.name)
lopezaguilar's avatar
lopezaguilar committed

            # Generate Request for Test Data
            when = self.robot.get_request(test_name=test.template, name=test.name)
        else:
            # We are talking about a Test Cases without Test Template
            # Generate Checks for Test Data
lopezaguilar's avatar
lopezaguilar committed
            then = self.robot.get_checks(test_name=test.name, apiutils=self.apiutils, name=test.name)
            # Generate Request for Test Data
lopezaguilar's avatar
lopezaguilar committed
            when = self.robot.get_request(test_name=test.name, name=test.name)
        test_case = {
            'name': test.name,
            'permutation_tp_id': f'{self.base_TP_id}/{test.name.split(" ")[0]}',
            'doc': documentation,
            'tags': tags,
            'setup': test.setup.name,
            'teardown': test.teardown.name,
            'template': test.template,
            'then': then,
            'when': when
        }

        try:
            self.test_suite['initial_condition'] = self.initial_conditions[test.setup.name]
            string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
        except KeyError:
            self.test_suite['initial_condition'] = self.initial_conditions['Initial State']
            string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)

        self.test_cases.append(test_case)

    def get_body(self, string: str) -> str:
        aux = string.split(' ')[1:]

        if len(aux) >= 2:
            aux = [self.get_body(x) for x in aux]
            aux = ' '.join(aux)
        elif len(aux) == 1:
        else:
    def generate_name(self):
        base_dir = dirname(dirname(dirname(__file__)))
        tp_id = str(self.suite.source.parent).replace(f'{base_dir}/', "")

        for key, value in self.identifier.items():
            tp_id = tp_id.replace(key, value)

        self.base_TP_id = tp_id

        name = self.suite.name.replace(" ", "_")
        tp_id = f'{self.base_TP_id}/{name}'

        return tp_id

    def generate_reference(self, version):
        # Get the list of tags in the different tests
        tags = [x.tags for x in self.suite.tests]
        tags = [element for sublist in tags for element in sublist if element[0].isdigit()]

        if len(tags) == 0:
lopezaguilar's avatar
lopezaguilar committed
            # We have different tests cases that call a test template, maybe the Tags are defined in the template
            reference, clauses = self.generate_reference_template(version=version)
        else:
lopezaguilar's avatar
lopezaguilar committed
            if len(self.robot.test_template_name) == 0:
                # We have normal tests cases
                reference, clauses = self.generate_reference_testcases(tags=tags, version=version)
lopezaguilar's avatar
lopezaguilar committed
            else:
                # We have tests cases with information about tags but a template with information about documentation
                reference, clauses = self.generate_reference_testcases(tags=tags, version=version)
lopezaguilar's avatar
lopezaguilar committed
                _, _ = self.generate_reference_template(version=version, need_tags=False)
lopezaguilar's avatar
lopezaguilar committed
    def generate_reference_template(self, version, need_tags=True):
        # Get the list of arguments, we select the first one because the 2nd keyword corresponds
        # to the teardown operation
        args = [list(x.keywords)[0] for x in self.suite.tests]
        args = [{str(x.parent): list(x.args)} for x in args]
        self.args = dict()
        _ = [self.args.update(x) for x in args]

        template_name = list(set([list(x.keywords)[0].name for x in self.suite.tests]))[0]

        # Due to the information of the tags are contained in the Keyword description of the template, we need to
        # analyse the Keyword.
        string = self.robot.get_substring(initial_string='** Keywords ***', final_string='', include=False)
        reference, clauses = self.get_info_from_template(name=template_name,
                                                         string=string,
                                                         version=version,
                                                         need_tags=need_tags)
lopezaguilar's avatar
lopezaguilar committed
    def get_info_from_template(self, name: str, string: str, version: str, need_tags: bool):
lopezaguilar's avatar
lopezaguilar committed
        # TODO: Check that the name of the template is in the string receive
        # Get the Tags line and the tag value
lopezaguilar's avatar
lopezaguilar committed
        reference = str()
lopezaguilar's avatar
lopezaguilar committed

        tags = self.get_substring(string=string, key='[Tags]')
        self.tags_template = tags[1:]

lopezaguilar's avatar
lopezaguilar committed
        if need_tags:
            try:
                clauses = list(set([element.replace("_", ".")
                                    for sublist in tags for element in tags if element[0].isdigit()]))
                clauses.sort()
lopezaguilar's avatar
lopezaguilar committed
            except IndexError:
                raise Exception("ERROR, Probably [Tags] does not include reference to the section in the spec.")
lopezaguilar's avatar
lopezaguilar committed

            if len(clauses) == 1:
                reference = f'{self.references[version]}, clause {clauses[0]}'
            elif len(clauses) > 1:
                reference = f'{self.references[version]}, clauses {", ".join(clauses)}'

            # clauses = f'PICS_{tag}'

        # Get the arguments
        self.arguments = self.get_substring(string=string, key='[Arguments]')
        self.arguments = self.arguments[1:]

        # Get the documentation
        self.documentation_template = self.get_substring(string=string, key='[Documentation]')
        self.documentation_template = self.documentation_template[1:][0]

    def get_substring(self, string: str, key: str):
        pos1 = string.find(key) - 1
        pos2 = string[pos1:].find('\n')
        result = string[pos1:pos1+pos2].split('    ')

        return result

    def generate_reference_testcases(self, tags: list, version: str):
        clauses = [x.replace("_", ".") for x in tags if match(pattern=r'^(\d+_\d+_\d+)|^(\d+_\d+)', string=x)]
        clauses = list(set(clauses))
        clauses.sort()

        if len(clauses) == 0:
lopezaguilar's avatar
lopezaguilar committed
            raise Exception(
                f'ERROR: the Test Suite {self.suite.name} has different clauses or no clauses (Tags): {tags}\n'
                f'Unable to select the corresponding Reference of this Test Suite')
        elif len(clauses) == 1:
            reference = f'{self.references[version]}, clause {clauses[0]}'
        else:
            reference = f'{self.references[version]}, clauses {", ".join(clauses)}'