Commit f0f09033 authored by lopezaguilar's avatar lopezaguilar
Browse files

Merge branch 'logparser' into 'testsuitedata'

Create ErrorListener and Test Suite Manager

See merge request !132
parents ba2bfe8f 069b03c5
Loading
Loading
Loading
Loading
+123 −1
Original line number Diff line number Diff line
@@ -84,6 +84,20 @@ Docker host.
``` ifconfig docker0 | grep inet | awk '{print $2}' ``` and that IP is the one that you need to use for 
notification purposes.

Optionally, there are some extra variables that can be used during the generation of execution results of the Test 
Case with a specific Robot Listener class to automatically generate a GitHub Issue in the GitHub repository of the 
Context Broker of a failed Test Case. In case that you cannot or do not want to use this functionality, delete those 
variables from the file.

As an explanation of the process, the GitHub Issue will be created if there is no other issue in the repository with
the same name or there is an issue with the same name, but it is not closed.

In order to create these issues, the [GitHub REST API](https://docs.github.com/en/rest) is used. For this purpose, 
the authentication process is using a personal access token. The needed variables are the following:
* `github_owner`: Your GitHub user account. 
* `github_broker_repo` : The corresponding URL of the Context Broker repository.
* `github_token` : Your personal access token. Please take a look to the GitHub documentation if you want to generate 
your own [personal access token](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens).

## Execute the NGSI-LD Test Suite

@@ -131,6 +145,103 @@ test launch command followed by the file name.
> > .venv\scripts\deactivate.bat
> ```

## Test Suite Management (tsm)

The `tsm` script is designed to facilitate the selection and execution of the Test Suite, especially if not all the 
endpoints of the API have been implemented for a specific Context Broker. This script provides a set of commands 
to enable or disable Test Cases, Robot Test Suites or Collections (Robot Test Suite Groups), visualize the current 
status of the different Test Cases, execute the selected Test Cases and perform other related operations as described 
below.

The `tsm` script generates a pickle file named `tsm.pkl`, which stores the tuples list corresponding to each Test Case 
with the following information:

    (switch, running status, Test Case long name)

where the values and their meaning are the following:
* **switch**:
  * `ON`: the Test Case is on, which means that it will be executed by the script.
  * `OFF`: the Test Case is off, and therefore is not selected to be executed by the script.
  * `MISSING`: the Test Case is not anymore in the Test Suite Structure. An update operation should be run to update 
  the `tsm.pkl` with the current set of available Test Cases in the filesystem.
  * `NEW`:  new Test Case discovered by the tsm after an update operation.
* **status**:
  * `PASSED`: the robot framework executes the Test Case with result PASS.
  * `FAILED`: the robot framework executes the Test Case with result FAIL.
  * `PENDING`: the Test Case is pending to be executed by the robot framework.
* **test case long name**: the Test Case long name set by robot framework based on the Robot Test Suite number and 
the Test Case name (e.g., NGSILD.032 02.032_02_01 Delete Unknown Subscription)

### Installation
The `tsm` script is integrated with arguments auto-completion for bash, therefore it is needed the installation of 
the argcomplete python package on your system:

    pip install argcomplete

and to enable the completion after the installation executing the following command:

    activate-global-python-argcomplete 

and then 

    eval "$(register-python-argcomplete tsm)"

Now, it is possible to autocomplete the commands and show possible options executing the script. Also –help argument 
is available to obtain more information about the options.

### Execution

The tsm cases update command updates the `tsm.pkl` file with all the Robot Test Suite under the local path. If the 
pickle file does not exist, it is created. After the creation of this file, it is possible to execute the script 
to maintain and run the selected Test Cases from the pickle file. The list of commands is the following:

* **Test Cases (cases)**
  * Switch ON Test Cases
  
        tsm cases on [test_cases]
  
  * Switch OFF Test Cases

        tsm cases off [test_cases]
        tsm cases off "NGSILD.032 01.032_01_02 InvalidId"
  
  * List Test Cases based on the specific flag.
        
        tsm cases list [on, off, missing, new, passed, failed, pending, all]
        tsm cases list ./TP/NGSI-LD/CommonBehaviours
  
  * Run Test Cases that are enabled

        tsm cases run [on, off, missing, new, passed, failed, pending, [test_cases]]
        tsm cases run NGSILD.048\ 01.048_01_06\ Endpoint\ post\ /temporal/entities/
        tsm cases run pending
      
  * Update the pickle file with the current Test Cases

        tsm cases update
       
  * Clean Test Cases, remove the Test Cases that were marked as MISSING

        tsm cases clean

* **Robot Test Suites (suites)**
  * Switch ON Robot Test Suites

        tsm suites on [suites]
       
  * Switch OFF Robot Test Suites

        tsm suites off [suites]

* **Test Collections (collections)**
  * Switch ON Test Collections

        tsm collections on [collections]
        tsm collections on ./TP/NGSI-LD/CommonBehaviours

  * Switch OFF Test Collections

        tsm collections off [collections]

## Contribute to the Test Suite

@@ -324,7 +435,18 @@ And, if you want to generate a documentation for the Test Cases:

```$ python3  -m robot.testdoc  TP/NGSI-LD  api_docs/TestCases.html```

### Coding Style of Test Suites
## Generate output file details only for failed tests

It is possible to generate a report only for the failed tests through the use of a specific listener in the execution
of the robot framework. For example, if you want to execute the test suite number 043 and generate the report, you can
execute the following command:

```robot  --listener libraries/ErrorListener.py --outputdir ./results ./TP/NGSI-LD/CommonBehaviours/043.robot```

It will generate a specific `errors.log` file into the `results` folder with the description of the different steps 
developed and the mismatched observed on them.

## Coding Style of Test Suites

And if you want to tidy (code style) the Test Suites:

+175 −17
Original line number Diff line number Diff line
from os.path import join
from os import getcwd
from os.path import join, splitext, exists
from os import getcwd, remove
from re import compile, match, MULTILINE
from json import loads, dumps
from http import HTTPStatus
from convertMD import Markdown
from githubIssue import GitHubIssue
from robot.running.context import EXECUTION_CONTEXTS


def __get_header__(dictionary: dict, key: str) -> str:
    result = str()
    try:
        result = f'  {key}: {dictionary["headers"][key]}\n'
        return result
    except KeyError:
        pass


def __get_status_meaning__(status_code):
    try:
        status = HTTPStatus(status_code)
        return status.phrase
    except ValueError:
        return "Unknown status code"


def __is_string_dict__(string: str) -> bool:
    try:
        json_object = loads(string)
        if isinstance(json_object, dict):
            return True
    except ValueError:
        pass
    return False


def __flatten_concatenation__(matrix):
    flat_list = []
    for row in matrix:
        if isinstance(row, str):
            flat_list.append(row)
        else:
            flat_list += row

    return flat_list


def __get_body__(dictionary: dict):
    result = str()
    if dictionary is None:
        result = '  No body\n'
    else:
        result = dumps(dictionary, indent=2)
        result = (result.replace('\n', '\n  ')
                  .replace("{", "  {")
                  .replace("[", "  [") + '\n')

    return result


class ErrorListener:
    ROBOT_LISTENER_API_VERSION = 2

    def __init__(self, filename='errors.log'):
        self.cwd = getcwd()
        out_path = join('results', filename)
        self.max_length = 150
        self.outfile = open(out_path, 'w')
        self.tests = str()
        self.filename_md = None
        self.filename_log = None
        self.cwd = None
        self.outfile = None
        self.previous_content = str()
        self.filename = filename

        self.max_length_suite = 150
        self.max_length_case = 80
        self.tests = list()
        self.suite_name = str()
        self.rx_dict = {
            'variables': compile('^\${.*$|^\&{.*$|^\@{.*'),
@@ -23,31 +84,128 @@ class ErrorListener:
                                  '^CONNECT.*(Request|Response).*$|'
                                  '^OPTIONS.*(Request|Response).*$|'
                                  '^TRACE.*(Request|Response).*$|'
                                  '^PATCH.*(Request|Response).*$', MULTILINE)
                                  '^PATCH.*(Request|Response).*$', MULTILINE),
            'length_log': compile('^Length is \d+$')
        }

    def generate_file_path(self):
        if self.outfile is None:
            # This is the first time that we execute the test therefore we configure the filenames
            ns = EXECUTION_CONTEXTS.current
            output_dir = ns.variables.current.store.data['OUTPUT_DIR']

            basename = splitext(self.filename)[0]
            self.filename_log = join(output_dir, self.filename)
            self.filename_md = join(output_dir, f'{basename}.md')

            self.cwd = getcwd()
            self.outfile = open(self.filename_log, 'w')

            # Check if a previous version of the markdown file exists in the folder, then we delete it in order
            # not to append to this file
            if exists(self.filename_md):
                remove(self.filename_md)

    def start_suite(self, name, attrs):
        self.generate_file_path()

        self.suite_name = attrs['source'].replace(self.cwd, '')[1:].replace('.robot', '').replace('/', ".")
        self.outfile.write(f'{"=" * self.max_length}\n')

        if attrs['doc'] != '':
            self.outfile.write(f'{"=" * self.max_length_suite}\n')
            self.outfile.write(f'{self.suite_name} :: {attrs["doc"]}\n')
        self.outfile.write(f'{"=" * self.max_length}\n')
            self.outfile.write(f'{"=" * self.max_length_suite}\n')

    def start_test(self, name, attrs):
        self.tests = f"{name} :: {attrs['doc']}\n"
        self.tests.append(f"\n\n{name}\n")
        self.tests.append(f'{"=" * self.max_length_case}\n')

    def end_test(self, name, attrs):
        if attrs['status'] != 'PASS':
            self.outfile.write(self.tests)
            self.outfile.write(f'| FAIL |\n{attrs["message"]}\n')
            self.outfile.write(f'{"-" * self.max_length}\n')
            flat_list = __flatten_concatenation__(matrix=self.tests)
            [self.outfile.write(x) for x in flat_list if x is not None]
            self.tests.clear()

    def end_suite(self, name, attrs):
        self.outfile.write(f'{self.suite_name} :: {attrs["doc"]}... | {attrs["status"]} |\n{attrs["statistics"]}\n')
        self.outfile.write('\n\n\n')
        self.outfile.close()

        try:
            # If there was an error, generate the markdown content and upload an issue in the corresponding
            # GitHub Repository
            md = Markdown(filename=self.filename_log, previous_content=self.previous_content)
            md.get_names()
            # md.generate_md()
            self.previous_content = md.save_file(filename=self.filename_md)

            # Check if we have defined the GitHub parameters in the variables.py file, if this is the case upload
            # gh = GitHubIssue(issue_title=f'{attrs["longname"]} - {attrs["doc"]}', issue_content=md.get_markdown())

            # gh.create_issues()
        except KeyError as err:
            print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')
        except IndexError as err:
            print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')
        except Exception as err:
            print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')

        # We need to reopen the file in case that we are executing several TCs
        self.outfile = open(self.filename_log, 'a')

    def log_message(self, msg):
        if (not match(pattern=self.rx_dict['variables'], string=msg['message']) and
                not match(pattern=self.rx_dict['http_verbs'], string=msg['message'])):
            self.outfile.write(f'{msg["message"]}\n')
            self.tests.append(self.__get_message__(msg["message"]))

    def close(self):
        self.outfile.write('\n\n\n')
        self.outfile.close()

    def __get_message__(self, message: str) -> str:
        result = str()
        if message == 'Request ->':
            result = f'\n\nRequest:\n{"-" * self.max_length_case}\n'
        elif message == 'Response ->':
            result = f'\n\nResponse:\n{"-" * self.max_length_case}\n'
        elif __is_string_dict__(string=message):
            result = self.__generate_pretty_output__(data=message)
        elif message[0] == '\n':
            # This is the title of a test case operation
            result = message
        elif message == 'Dictionary comparison failed with -> ':
            result == None
        elif match(pattern=self.rx_dict['length_log'], string=message) is None:
            result = f'\nMismatch:\n{"-" * self.max_length_case}\n{message}\n'

        return result

    def __generate_pretty_output__(self, data: str) -> list:
        data = loads(data)

        output = list()

        received_header_keys = data['headers'].keys()

        if 'User-Agent' in received_header_keys:
            # User-Agent is a Request Header, therefore we generate the request header
            output.append(f'  {data["method"]} {data["url"]}\n')

            [output.append(__get_header__(dictionary=data, key=x)) for x in list(received_header_keys)]

            output.append('\n')

            output.append(__get_body__(dictionary=data['body']))
        else:
            # This is a Response header
            # robotframework-requests is based on python request, so it is using HTTP/1.1
            output.append(f'  HTTP/1.1 {data["status_code"]} {__get_status_meaning__(data["status_code"])}\n')

            [output.append(__get_header__(dictionary=data, key=x)) for x in list(received_header_keys)]

            output.append(f'  Date: REGEX(. *)\n')

            output.append('\n')

            output.append(__get_body__(dictionary=data['body']))

        return output

libraries/convertMD.py

0 → 100644
+114 −0
Original line number Diff line number Diff line
from re import compile, match, findall, MULTILINE
from difflib import SequenceMatcher


def get_string_difference(string1: str, string2: str) -> str:
    differ = SequenceMatcher(None, string1, string2)
    differences = differ.get_opcodes()
    diff_string = ""

    for tag, i1, i2, j1, j2 in differences:
        if tag == 'delete' or tag == 'replace':
            diff_string += string1[i1:i2]
        elif tag == 'insert' or tag == 'replace':
            diff_string += string2[j1:j2]

    return diff_string


class Markdown:
    def __init__(self, filename: str, previous_content: str):
        # Read the content of the input file
        with open(filename, 'r') as file:
            self.content = file.read()
        file.close()

        # Initial previous content
        if previous_content != '':
            # If there was a previous content in the file, take the difference to do the process
            self.content = get_string_difference(string1=previous_content, string2=self.content)

        self.data = {
            "suite": str(),
            "cases": list(),
            "steps": list()
        }

        self.markdown_content = str()

    def get_names(self):
        pattern1 = compile('^(\S+.*)$', MULTILINE)

        aux = findall(pattern=pattern1, string=self.content)

        special_lines = ['Response:', 'Request:', 'Mismatch:', f'{"=" * 150}', f'{"=" * 80}', f'{"-" * 80}']
        xs = [x for x in aux if x not in special_lines]

        prefixes_to_remove = ["Item ", "+ ", "- ", "Value of ", "HTTP status code", "HTTPError:", "AttributeError:"]
        xs = [item for item in xs if not any(item.startswith(prefix) for prefix in prefixes_to_remove)]

        # Get the name of the Test Suite
        self.data["suite"] = xs[0]

        # Get the names of the Test Cases
        try:
            pattern = r"\d{3}\w+"
            self.data["cases"] = [item for item in xs if match(pattern, item)]
        except IndexError as err:
            print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')

        # Get the rest of values -> Steps
        # Get items from listA not present in listB and not equal to exclude_string
        self.data['steps'] = [item for item in xs if item not in self.data['cases'] and item != self.data['suite']]
        self.data['steps'] = list(set(self.data['steps']))

    def generate_md(self):
        # Replace the title of the Test Suite
        self.markdown_content = self.content
        self.markdown_content = (
            self.markdown_content.replace(f'{"=" * 150}\n{self.data["suite"]}\n{"=" * 150}', f'# {self.data["suite"]}'))

        # Replace the name of the Test Cases
        for x in self.data['cases']:
            self.markdown_content = (
                self.markdown_content.replace(f'{x}\n{"=" * 80}\n', f'```\n## {x}\n'))

        # Replace Request, Response, and Mismatch
        self.markdown_content = (self.markdown_content.replace(f'Request:\n{"-" * 80}', '#### Request:\n```')
                                 .replace(f'Response:\n{"-" * 80}', '```\n\n#### Response:\n```')
                                 .replace(f'Mismatch:\n{"-" * 80}', '```\n\n#### Mismatch:\n```'))

        # Replace the name of the steps
        for x in self.data['steps']:
            self.markdown_content = (
                self.markdown_content.replace(f'{x}\n', f'```\n### {x}\n'))

        # Final steps, correct the code style for the title of the Test Cases
        # Define patterns and replacement strings
        index = True
        for x in self.data['cases']:
            if index:
                self.markdown_content = (
                    self.markdown_content.replace(f'```\n## {x}\n\n```\n', f'## {x}\n\n'))
                index = False
            else:
                self.markdown_content = (
                    self.markdown_content.replace(f'```\n## {x}\n\n```\n', f'```\n## {x}\n\n'))

        # If the final number of "```" is odd, means that we need to close the last code section
        # this is a workaround to close the last section of code if this is keep open
        count = self.markdown_content.count("```")
        if count % 2 == 1:
            print(True)
            self.markdown_content = f"{self.markdown_content}\n```"

    def save_file(self, filename: str):
        # Write the Markdown content to the output file
        with open(filename, 'a') as file:
            file.write(self.markdown_content)
        file.close()

        return self.content

    def get_markdown(self) -> str:
        return self.markdown_content
+148 −0

File added.

Preview size limit exceeded, changes collapsed.

+5 −0
Original line number Diff line number Diff line
@@ -5,3 +5,8 @@ notification_server_host = '0.0.0.0'
notification_server_port = 8085
context_source_host = '0.0.0.0'
context_source_port = 8086

# GitHub repository details
# github_owner = 'your_github_username'
# github_broker_repo = 'context_broker_repository'
# github_token = 'your_github_token'
Loading