Commits (4)
......@@ -87,6 +87,20 @@ Docker host.
``` ifconfig docker0 | grep inet | awk '{print $2}' ``` and that IP is the one that you need to use for
notification purposes.
Optionally, there are some extra variables that can be used during the generation of execution results of the Test
Case with a specific Robot Listener class to automatically generate a GitHub Issue in the GitHub repository of the
Context Broker of a failed Test Case. In case that you cannot or do not want to use this functionality, delete those
variables from the file.
As an explanation of the process, the GitHub Issue will be created if there is no other issue in the repository with
the same name or there is an issue with the same name, but it is not closed.
In order to create these issues, the [GitHub REST API](https://docs.github.com/en/rest) is used. For this purpose,
the authentication process is using a personal access token. The needed variables are the following:
* `github_owner`: Your GitHub user account.
* `github_broker_repo` : The corresponding URL of the Context Broker repository.
* `github_token` : Your personal access token. Please take a look to the GitHub documentation if you want to generate
your own [personal access token](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens).
## Execute the NGSI-LD Test Suite
......@@ -134,6 +148,103 @@ test launch command followed by the file name.
> > .venv\scripts\deactivate.bat
> ```
## Test Suite Management (tsm)
The `tsm` script is designed to facilitate the selection and execution of the Test Suite, especially if not all the
endpoints of the API have been implemented for a specific Context Broker. This script provides a set of commands
to enable or disable Test Cases, Robot Test Suites or Collections (Robot Test Suite Groups), visualize the current
status of the different Test Cases, execute the selected Test Cases and perform other related operations as described
below.
The `tsm` script generates a pickle file named `tsm.pkl`, which stores the tuples list corresponding to each Test Case
with the following information:
(switch, running status, Test Case long name)
where the values and their meaning are the following:
* **switch**:
* `ON`: the Test Case is on, which means that it will be executed by the script.
* `OFF`: the Test Case is off, and therefore is not selected to be executed by the script.
* `MISSING`: the Test Case is not anymore in the Test Suite Structure. An update operation should be run to update
the `tsm.pkl` with the current set of available Test Cases in the filesystem.
* `NEW`: new Test Case discovered by the tsm after an update operation.
* **status**:
* `PASSED`: the robot framework executes the Test Case with result PASS.
* `FAILED`: the robot framework executes the Test Case with result FAIL.
* `PENDING`: the Test Case is pending to be executed by the robot framework.
* **test case long name**: the Test Case long name set by robot framework based on the Robot Test Suite number and
the Test Case name (e.g., NGSILD.032 02.032_02_01 Delete Unknown Subscription)
### Installation
The `tsm` script is integrated with arguments auto-completion for bash, therefore it is needed the installation of
the argcomplete python package on your system:
pip install argcomplete
and to enable the completion after the installation executing the following command:
activate-global-python-argcomplete
and then
eval "$(register-python-argcomplete tsm)"
Now, it is possible to autocomplete the commands and show possible options executing the script. Also –help argument
is available to obtain more information about the options.
### Execution
The tsm cases update command updates the `tsm.pkl` file with all the Robot Test Suite under the local path. If the
pickle file does not exist, it is created. After the creation of this file, it is possible to execute the script
to maintain and run the selected Test Cases from the pickle file. The list of commands is the following:
* **Test Cases (cases)**
* Switch ON Test Cases
tsm cases on [test_cases]
* Switch OFF Test Cases
tsm cases off [test_cases]
tsm cases off "NGSILD.032 01.032_01_02 InvalidId"
* List Test Cases based on the specific flag.
tsm cases list [on, off, missing, new, passed, failed, pending, all]
tsm cases list ./TP/NGSI-LD/CommonBehaviours
* Run Test Cases that are enabled
tsm cases run [on, off, missing, new, passed, failed, pending, [test_cases]]
tsm cases run NGSILD.048\ 01.048_01_06\ Endpoint\ post\ /temporal/entities/
tsm cases run pending
* Update the pickle file with the current Test Cases
tsm cases update
* Clean Test Cases, remove the Test Cases that were marked as MISSING
tsm cases clean
* **Robot Test Suites (suites)**
* Switch ON Robot Test Suites
tsm suites on [suites]
* Switch OFF Robot Test Suites
tsm suites off [suites]
* **Test Collections (collections)**
* Switch ON Test Collections
tsm collections on [collections]
tsm collections on ./TP/NGSI-LD/CommonBehaviours
* Switch OFF Test Collections
tsm collections off [collections]
## Contribute to the Test Suite
......@@ -327,7 +438,18 @@ And, if you want to generate a documentation for the Test Cases:
```$ python3 -m robot.testdoc TP/NGSI-LD api_docs/TestCases.html```
### Coding Style of Test Suites
## Generate output file details only for failed tests
It is possible to generate a report only for the failed tests through the use of a specific listener in the execution
of the robot framework. For example, if you want to execute the test suite number 043 and generate the report, you can
execute the following command:
```robot --listener libraries/ErrorListener.py --outputdir ./results ./TP/NGSI-LD/CommonBehaviours/043.robot```
It will generate a specific `errors.log` file into the `results` folder with the description of the different steps
developed and the mismatched observed on them.
## Coding Style of Test Suites
And if you want to tidy (code style) the Test Suites:
......
......@@ -11,9 +11,16 @@ class ParseVariablesFile:
# Read the contents of the file
file_content = file.read()
# Generate a list of lines from the file content
file_content = file_content.split('\n')
# Dismiss the blank lines and the lines starting with # -> comments
file_content = [x for x in file_content if x != '' and x[0] != '#']
# Extract the key = value format of the variables
file_content = [x.split('=') for x in file_content if x != '']
# Delete the ' characters from the keys and delete blank spaces
self.variables = {x[0].strip(): x[1].replace("'", "").strip() for x in file_content}
def get_variable(self, variable: str) -> str:
......
from os.path import join
from os import getcwd
from os.path import join, splitext, exists
from os import getcwd, remove
from re import compile, match, MULTILINE
from json import loads, dumps
from http import HTTPStatus
from convertMD import Markdown
from githubIssue import GitHubIssue
from robot.running.context import EXECUTION_CONTEXTS
def __get_header__(dictionary: dict, key: str) -> str:
result = str()
try:
result = f' {key}: {dictionary["headers"][key]}\n'
return result
except KeyError:
pass
def __get_status_meaning__(status_code):
try:
status = HTTPStatus(status_code)
return status.phrase
except ValueError:
return "Unknown status code"
def __is_string_dict__(string: str) -> bool:
try:
json_object = loads(string)
if isinstance(json_object, dict):
return True
except ValueError:
pass
return False
def __flatten_concatenation__(matrix):
flat_list = []
for row in matrix:
if isinstance(row, str):
flat_list.append(row)
else:
flat_list += row
return flat_list
def __get_body__(dictionary: dict):
result = str()
if dictionary is None:
result = ' No body\n'
else:
result = dumps(dictionary, indent=2)
result = (result.replace('\n', '\n ')
.replace("{", " {")
.replace("[", " [") + '\n')
return result
class ErrorListener:
ROBOT_LISTENER_API_VERSION = 2
def __init__(self, filename='errors.log'):
self.cwd = getcwd()
out_path = join('results', filename)
self.max_length = 150
self.outfile = open(out_path, 'w')
self.tests = str()
self.filename_md = None
self.filename_log = None
self.cwd = None
self.outfile = None
self.previous_content = str()
self.filename = filename
self.max_length_suite = 150
self.max_length_case = 80
self.tests = list()
self.suite_name = str()
self.rx_dict = {
'variables': compile('^\${.*$|^\&{.*$|^\@{.*'),
......@@ -23,31 +84,128 @@ class ErrorListener:
'^CONNECT.*(Request|Response).*$|'
'^OPTIONS.*(Request|Response).*$|'
'^TRACE.*(Request|Response).*$|'
'^PATCH.*(Request|Response).*$', MULTILINE)
'^PATCH.*(Request|Response).*$', MULTILINE),
'length_log': compile('^Length is \d+$')
}
def generate_file_path(self):
if self.outfile is None:
# This is the first time that we execute the test therefore we configure the filenames
ns = EXECUTION_CONTEXTS.current
output_dir = ns.variables.current.store.data['OUTPUT_DIR']
basename = splitext(self.filename)[0]
self.filename_log = join(output_dir, self.filename)
self.filename_md = join(output_dir, f'{basename}.md')
self.cwd = getcwd()
self.outfile = open(self.filename_log, 'w')
# Check if a previous version of the markdown file exists in the folder, then we delete it in order
# not to append to this file
if exists(self.filename_md):
remove(self.filename_md)
def start_suite(self, name, attrs):
self.generate_file_path()
self.suite_name = attrs['source'].replace(self.cwd, '')[1:].replace('.robot', '').replace('/', ".")
self.outfile.write(f'{"=" * self.max_length}\n')
self.outfile.write(f'{self.suite_name} :: {attrs["doc"]}\n')
self.outfile.write(f'{"=" * self.max_length}\n')
if attrs['doc'] != '':
self.outfile.write(f'{"=" * self.max_length_suite}\n')
self.outfile.write(f'{self.suite_name} :: {attrs["doc"]}\n')
self.outfile.write(f'{"=" * self.max_length_suite}\n')
def start_test(self, name, attrs):
self.tests = f"{name} :: {attrs['doc']}\n"
self.tests.append(f"\n\n{name}\n")
self.tests.append(f'{"=" * self.max_length_case}\n')
def end_test(self, name, attrs):
if attrs['status'] != 'PASS':
self.outfile.write(self.tests)
self.outfile.write(f'| FAIL |\n{attrs["message"]}\n')
self.outfile.write(f'{"-" * self.max_length}\n')
flat_list = __flatten_concatenation__(matrix=self.tests)
[self.outfile.write(x) for x in flat_list if x is not None]
self.tests.clear()
def end_suite(self, name, attrs):
self.outfile.write(f'{self.suite_name} :: {attrs["doc"]}... | {attrs["status"]} |\n{attrs["statistics"]}\n')
self.outfile.write('\n\n\n')
self.outfile.close()
try:
# If there was an error, generate the markdown content and upload an issue in the corresponding
# GitHub Repository
md = Markdown(filename=self.filename_log, previous_content=self.previous_content)
md.get_names()
# md.generate_md()
self.previous_content = md.save_file(filename=self.filename_md)
# Check if we have defined the GitHub parameters in the variables.py file, if this is the case upload
# gh = GitHubIssue(issue_title=f'{attrs["longname"]} - {attrs["doc"]}', issue_content=md.get_markdown())
# gh.create_issues()
except KeyError as err:
print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')
except IndexError as err:
print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')
except Exception as err:
print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')
# We need to reopen the file in case that we are executing several TCs
self.outfile = open(self.filename_log, 'a')
def log_message(self, msg):
if (not match(pattern=self.rx_dict['variables'], string=msg['message']) and
not match(pattern=self.rx_dict['http_verbs'], string=msg['message'])):
self.outfile.write(f'{msg["message"]}\n')
self.tests.append(self.__get_message__(msg["message"]))
def close(self):
self.outfile.write('\n\n\n')
self.outfile.close()
def __get_message__(self, message: str) -> str:
result = str()
if message == 'Request ->':
result = f'\n\nRequest:\n{"-" * self.max_length_case}\n'
elif message == 'Response ->':
result = f'\n\nResponse:\n{"-" * self.max_length_case}\n'
elif __is_string_dict__(string=message):
result = self.__generate_pretty_output__(data=message)
elif message[0] == '\n':
# This is the title of a test case operation
result = message
elif message == 'Dictionary comparison failed with -> ':
result == None
elif match(pattern=self.rx_dict['length_log'], string=message) is None:
result = f'\nMismatch:\n{"-" * self.max_length_case}\n{message}\n'
return result
def __generate_pretty_output__(self, data: str) -> list:
data = loads(data)
output = list()
received_header_keys = data['headers'].keys()
if 'User-Agent' in received_header_keys:
# User-Agent is a Request Header, therefore we generate the request header
output.append(f' {data["method"]} {data["url"]}\n')
[output.append(__get_header__(dictionary=data, key=x)) for x in list(received_header_keys)]
output.append('\n')
output.append(__get_body__(dictionary=data['body']))
else:
# This is a Response header
# robotframework-requests is based on python request, so it is using HTTP/1.1
output.append(f' HTTP/1.1 {data["status_code"]} {__get_status_meaning__(data["status_code"])}\n')
[output.append(__get_header__(dictionary=data, key=x)) for x in list(received_header_keys)]
output.append(f' Date: REGEX(. *)\n')
output.append('\n')
output.append(__get_body__(dictionary=data['body']))
return output
from re import compile, match, findall, MULTILINE
from difflib import SequenceMatcher
def get_string_difference(string1: str, string2: str) -> str:
differ = SequenceMatcher(None, string1, string2)
differences = differ.get_opcodes()
diff_string = ""
for tag, i1, i2, j1, j2 in differences:
if tag == 'delete' or tag == 'replace':
diff_string += string1[i1:i2]
elif tag == 'insert' or tag == 'replace':
diff_string += string2[j1:j2]
return diff_string
class Markdown:
def __init__(self, filename: str, previous_content: str):
# Read the content of the input file
with open(filename, 'r') as file:
self.content = file.read()
file.close()
# Initial previous content
if previous_content != '':
# If there was a previous content in the file, take the difference to do the process
self.content = get_string_difference(string1=previous_content, string2=self.content)
self.data = {
"suite": str(),
"cases": list(),
"steps": list()
}
self.markdown_content = str()
def get_names(self):
pattern1 = compile('^(\S+.*)$', MULTILINE)
aux = findall(pattern=pattern1, string=self.content)
special_lines = ['Response:', 'Request:', 'Mismatch:', f'{"=" * 150}', f'{"=" * 80}', f'{"-" * 80}']
xs = [x for x in aux if x not in special_lines]
prefixes_to_remove = ["Item ", "+ ", "- ", "Value of ", "HTTP status code", "HTTPError:", "AttributeError:"]
xs = [item for item in xs if not any(item.startswith(prefix) for prefix in prefixes_to_remove)]
# Get the name of the Test Suite
self.data["suite"] = xs[0]
# Get the names of the Test Cases
try:
pattern = r"\d{3}\w+"
self.data["cases"] = [item for item in xs if match(pattern, item)]
except IndexError as err:
print(f'\n[ERROR] Unexpected {err=}, {type(err)=} in TC {self.suite_name}\n\n')
# Get the rest of values -> Steps
# Get items from listA not present in listB and not equal to exclude_string
self.data['steps'] = [item for item in xs if item not in self.data['cases'] and item != self.data['suite']]
self.data['steps'] = list(set(self.data['steps']))
def generate_md(self):
# Replace the title of the Test Suite
self.markdown_content = self.content
self.markdown_content = (
self.markdown_content.replace(f'{"=" * 150}\n{self.data["suite"]}\n{"=" * 150}', f'# {self.data["suite"]}'))
# Replace the name of the Test Cases
for x in self.data['cases']:
self.markdown_content = (
self.markdown_content.replace(f'{x}\n{"=" * 80}\n', f'```\n## {x}\n'))
# Replace Request, Response, and Mismatch
self.markdown_content = (self.markdown_content.replace(f'Request:\n{"-" * 80}', '#### Request:\n```')
.replace(f'Response:\n{"-" * 80}', '```\n\n#### Response:\n```')
.replace(f'Mismatch:\n{"-" * 80}', '```\n\n#### Mismatch:\n```'))
# Replace the name of the steps
for x in self.data['steps']:
self.markdown_content = (
self.markdown_content.replace(f'{x}\n', f'```\n### {x}\n'))
# Final steps, correct the code style for the title of the Test Cases
# Define patterns and replacement strings
index = True
for x in self.data['cases']:
if index:
self.markdown_content = (
self.markdown_content.replace(f'```\n## {x}\n\n```\n', f'## {x}\n\n'))
index = False
else:
self.markdown_content = (
self.markdown_content.replace(f'```\n## {x}\n\n```\n', f'```\n## {x}\n\n'))
# If the final number of "```" is odd, means that we need to close the last code section
# this is a workaround to close the last section of code if this is keep open
count = self.markdown_content.count("```")
if count % 2 == 1:
print(True)
self.markdown_content = f"{self.markdown_content}\n```"
def save_file(self, filename: str):
# Write the Markdown content to the output file
with open(filename, 'a') as file:
file.write(self.markdown_content)
file.close()
return self.content
def get_markdown(self) -> str:
return self.markdown_content
from requests import post, get
from re import finditer
from json import loads
try:
from resources.variables import github_owner, github_broker_repo, github_token
except ImportError:
# Some of the variables were not defiled, therefore we cannot execute the operation
classError = True
else:
classError = False
class GitHubIssue:
def __init__(self, issue_title: str, issue_content: str):
if classError:
# There is some GitHub parameters not defined, therefore this function does not effect
print("\nSome GitHub parameters were not defined in variables.py")
print("Expected parameters: github_owner, github_broker_repo, github_token")
return
else:
# Get the values of the parameter from the variables.py file
# GitHub repository details
self.url_create = f'https://api.github.com/repos/{github_owner}/{github_broker_repo}/issues'
self.issue_title = issue_title
self.issue_content = issue_content
self.test_cases = list()
self.test_cases_title = list()
def create_issues(self):
if classError:
# There is some GitHub parameters not defined, therefore this function does not effect
print("\nSome GitHub parameters were not defined in variables.py")
print("Expected parameters: github_owner, github_broker_repo, github_token")
return
else:
# Request body, the issue content need to be split into the different Test Cases in order to prevent
# body maximum of 65536 characters
self.generate_test_cases_info()
for i in range(0, len(self.test_cases_title)):
# We need to check that the issue was not already created previously
# if the issue is created previously and still open we do not create again,
# other case, we create the issue
# Obtain the extended title of the issue
issue_title = f'{self.issue_title} {self.test_cases_title[i]}'
# Check the issue
if self.check_duplicate_issue(issue_title=issue_title):
print('\nDuplicate issue found!')
else:
self.create_issue(body=self.test_cases[i])
def create_issue(self, body: str):
if classError:
# There is some GitHub parameters not defined, therefore this function does not effect
print("\nSome GitHub parameters were not defined in variables.py")
print("Expected parameters: github_owner, github_broker_repo, github_token")
return
else:
# Issue details
# Data of the issue
data = {
'title': self.issue_title,
'body': body
}
# Request headers
headers = {
'Accept': 'application/vnd.github.v3+json',
'Authorization': f'Token {github_token}'
}
# Send POST request to create the issue
response = post(url=self.url_create, headers=headers, json=data)
# Check the response status code
if response.status_code == 201:
print('\nIssue created successfully.')
else:
print('\nFailed to create the issue.')
print(f'Response: {response.status_code} - {response.text}')
def generate_test_cases_info(self):
if classError:
# There is some GitHub parameters not defined, therefore this function does not effect
print("\nSome GitHub parameters were not defined in variables.py")
print("Expected parameters: github_owner, github_broker_repo, github_token")
return
else:
pattern = r'##\s*[0-9]{3}_[0-9]{2}_[0-9]{2}.*\n' # Split on one or more non-word characters
count = int()
indexes = list()
match = None
for match in finditer(pattern, self.issue_content):
count += 1
indexes.append(match.start())
if match:
title = self.issue_content[0:indexes[0]]
else:
raise KeyError("Search unsuccessful. It was expected the the name of the Test Cases start with "
"<ddd>_<dd>_<dd>, where d is a digit, e.g., 027_01_01")
# Get the list of Test Cases
for i in range(1, len(indexes) + 1):
self.test_cases_title.append(f'({self.issue_content[indexes[i-1]+3:indexes[i-1]+12]})')
if i < len(indexes):
self.test_cases.append(self.issue_content[indexes[i-1]:indexes[i]])
else:
self.test_cases.append(self.issue_content[indexes[i-1]:])
self.test_cases = [f'{title}\n\n{x}' for x in self.test_cases]
def check_duplicate_issue(self, issue_title):
if classError:
# There is some GitHub parameters not defined, therefore this function does not effect
print("\nSome GitHub parameters were not defined in variables.py")
print("Expected parameters: github_owner, github_broker_repo, github_token")
return
else:
# Generate the URL of the query
url = f'repo:{github_owner}/{github_broker_repo} is:issue is:open in:title "{issue_title}"'
# Make the API request
response = get(
'https://api.github.com/search/issues',
params={'q': url}
)
# Check the response status code
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Check if any issues were found
if data['total_count'] > 0:
return True # Duplicate issue found
else:
raise Exception(loads(response.text)['errors'][0]['message'])
return False # No duplicate issue found
......@@ -7,4 +7,9 @@ context_source_host = '0.0.0.0'
context_source_port = 8086
context_server_host = '0.0.0.0'
context_server_port = 8087
core_context = 'https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context-v1.6.jsonld'
\ No newline at end of file
core_context = 'https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context-v1.6.jsonld'
# GitHub repository details
# github_owner = 'your_github_username'
# github_broker_repo = 'context_broker_repository'
# github_token = 'your_github_token'
#!/usr/bin/env python
# Documentation
# https://kislyuk.github.io/argcomplete/
#
#
#
# Installation
# pip install argcomplete
# activate-global-python-argcomplete
#
#
# In global completion mode, you don’t have to register each argcomplete-capable executable separately.
# Instead, the shell will look for the string PYTHON_ARGCOMPLETE_OK in the first 1024 bytes of any
# executable that it’s running completion for, and if it’s found, follow the rest of the argcomplete
# protocol as described above.
#
# Additionally, completion is activated for scripts run as python <script> and python -m <module>.
# If you’re using multiple Python versions on the same system, the version being used to run the
# script must have argcomplete installed.
#
# Register your Python application with your shell’s completion framework by running register-python-argcomplete
# eval "$(register-python-argcomplete tsm)"
# write the line on .bashrc
#
#
# .tsm.pkl format
# (switch, running status, test case long name)
# switch: ON, OFF, MISSING, NEW
# status: PASSED, FAILED, PENDING
# test case long name: the test case long name set by robot framework based on the suite tree and the test case name
#
import argcomplete, argparse
import pickle
from robot.api import TestSuiteBuilder, TestSuite
import os
import sys
sys.path.append('libraries')
from ErrorListener import ErrorListener
############################################
# Global variables
NGSITEST_PKL='.tsm.pkl'
############################################
# Helpers
class CustomConsoleListener:
ROBOT_LISTENER_API_VERSION = 3
def start_test(self, test, result):
msg = f"Test: {result.longname}"
print("\n\n")
print("*" * len(msg))
print_cyan(msg)
print("." * len(msg))
def end_test(self, test, result):
print("result.status", result.longname, result.status)
msg = f"[{result.status}] Test: {result.longname}"
print("\n")
print("." * len(msg))
if result.status == 'PASS':
print_green(msg)
else:
print_red(msg)
print("*" * len(msg))
print("\n\n")
def print_green(text):
print(f"\033[32m{text}\033[0m")
def print_orange(text):
print(f"\033[33m{text}\033[0m")
def print_dark_gray(text):
print(f"\033[90m{text}\033[0m")
def print_red(text):
print(f"\033[91m{text}\033[0m")
def print_blue(text):
print(f"\033[94m{text}\033[0m")
def print_magenta(text):
print(f"\033[95m{text}\033[0m")
def print_cyan(text):
print(f"\033[96m{text}\033[0m")
def print_bright_white(text):
print(f"\033[97m{text}\033[0m")
def get_suite(directory):
all_suites = TestSuite("NGSILD")
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith('.robot'):
file_path = os.path.join(root, file)
suite = TestSuiteBuilder().build(file_path)
all_suites.suites.append(suite)
return all_suites
def get_test_cases(suite):
test_cases = []
test_cases.extend(suite.tests)
for subsuite in suite.suites:
test_cases.extend(get_test_cases(subsuite))
return test_cases
def get_test_cases_by_directory(directory):
suites = get_suite(directory)
test_cases = get_test_cases(suites)
return test_cases
def save_list_to_file(data, file_path):
with open(file_path, 'wb') as file:
pickle.dump(data, file)
def load_list_from_file(file_path):
try:
# Check if the file exists
if not os.path.exists(file_path):
# If the file does not exist, create an empty file
with open(file_path, 'wb'):
pass
print(f"Creating pinkle data file at '{file_path}'.")
# Check if the file is empty
if os.path.getsize(file_path) == 0:
return []
# Load data from the file
with open(file_path, 'rb') as file:
data = pickle.load(file)
return data
except FileNotFoundError:
raise FileNotFoundError(f"The file '{file_path}' does not exist.")
except ValueError:
print(f"The file '{file_path}' is empty.")
return []
except pickle.UnpicklingError as e:
print(f"Error unpickling data from '{file_path}': {e}")
return []
def filter_test_cases(fltr):
try:
test_cases = load_list_from_file(NGSITEST_PKL)
fltr_test_cases = [test for sw, st, test in test_cases if (sw.upper() in fltr) or (st.upper() in fltr)]
return fltr_test_cases
except FileNotFoundError as e:
previous_test_cases = []
print_red(f"Error: {e}")
def filter_tuples(fltr):
try:
test_cases = load_list_from_file(NGSITEST_PKL)
fltr_tups = [(sw,st, ln) for sw, st, ln in test_cases if sw in fltr]
return fltr_tups
except FileNotFoundError as e:
previous_test_cases = []
print_red(f"Error: {e}")
def get_suite_code(str):
parts = str.split('.')
if len(parts) >= 2:
return '.'.join(parts[:2])
else:
return str
def filter_test_suites(fltr):
try:
test_cases = load_list_from_file(NGSITEST_PKL)
fltr_tups = [(sw, st, ln) for sw, st, ln in test_cases if sw in fltr]
suites_codes_dict = {get_suite_code(ln) for sw, st, ln in fltr_tups}
return list(suites_codes_dict)
except FileNotFoundError as e:
previous_test_cases = []
print_red(f"Error: {e}")
def print_test_case(tup):
sw, st, test = tup
if sw == 'ON' and st == 'PASSED':
print_green(tup)
elif sw == 'ON' and st == 'FAILED':
print_red(tup)
elif sw == 'ON':
print_bright_white(tup)
elif sw == 'OFF':
print_dark_gray(tup)
elif sw == 'NEW':
print_orange(tup)
elif sw == 'MISSING':
print_blue(tup)
else:
print(f"Unknow switch '{sw}'")
############################################
# Command Handlers
def on_cases(args):
all_test_cases = load_list_from_file(NGSITEST_PKL)
onable_switches = ['OFF','NEW']
result_test_cases = []
if 'iterative' in args.test_cases:
print('Entering iterative mode...')
for tup in all_test_cases:
sw, st, test = tup
if sw in onable_switches:
print_test_case(tup)
choice = input("Switch on (Y)es, (n)o: ").upper() or 'Y'
if choice == 'N':
result_test_cases.append(tup)
else:
result_test_cases.append(('ON', st, test))
else:
result_test_cases.append(tup)
elif 'all' in args.test_cases:
print('Switching on all onable test cases')
for tup in all_test_cases:
sw, st, test = tup
if sw in onable_switches:
print_test_case(tup)
result_test_cases.append(('ON', st, test))
else:
result_test_cases.append(tup)
else:
for tc_to_switch_on in args.test_cases:
print_dark_gray(f"Switching on: {tc_to_switch_on}")
result_test_cases = all_test_cases
for i, (sw, st, test) in enumerate(result_test_cases):
if test == tc_to_switch_on:
# Update the switch state to "ON"
result_test_cases[i] = ('ON', st, test)
print_green(f"Test case {test} switched on.")
# Save the updated list back to the file
save_list_to_file(result_test_cases, NGSITEST_PKL)
def off_cases(args):
all_test_cases = load_list_from_file(NGSITEST_PKL)
result_test_cases = []
if 'iterative' in args.test_cases:
print('Entering iterative mode...')
for tup in all_test_cases:
sw, st, test = tup
if sw in ['ON','NEW']:
print_test_case(tup)
choice = input("Switch off (Y)es, (n)o: ").upper() or 'Y'
if choice == 'N':
result_test_cases.append(tup)
else:
result_test_cases.append(('OFF', st, test))
else:
result_test_cases.append(tup)
elif 'all' in args.test_cases:
print('Switching off all offable test cases')
for tup in all_test_cases:
sw, st, test = tup
if sw in ['ON','NEW']:
print_test_case(tup)
result_test_cases.append(('OFF', st, test))
else:
result_test_cases.append(tup)
else:
for tc_to_switch_off in args.test_cases:
print_green(f"Switching on: {tc_to_switch_off}")
for i, (sw, st, test) in enumerate(all_test_cases):
if test == tc_to_switch_off:
# Update the switch state to "ON"
all_test_cases[i] = ('OFF', st, test)
print_dark_gray(f"Test case {test} switched off.")
result_test_cases = all_test_cases
# Save the updated list back to the file
save_list_to_file(result_test_cases, NGSITEST_PKL)
def on_suites(args):
tups = load_list_from_file(NGSITEST_PKL)
result = []
for suite in args.suites:
for index, value in enumerate(tups):
sw, st, ln = value
if suite in ln:
tups[index] = ('ON', st, ln)
# Save the updated list back to the file
save_list_to_file(tups, NGSITEST_PKL)
def off_suites(args):
tups = load_list_from_file(NGSITEST_PKL)
result = []
for suite in args.suites:
for index, value in enumerate(tups):
sw, st, ln = value
if suite in ln:
tups[index] = ('OFF', st, ln)
# Save the updated list back to the file
save_list_to_file(tups, NGSITEST_PKL)
def on_collections(args):
# Get a list with test cases for each collection to on
test_cases=[]
for collection in args.collections:
test_cases.extend(get_test_cases_by_directory(collection))
test_cases_longname = [tc.longname for tc in test_cases]
tups = load_list_from_file(NGSITEST_PKL)
result = []
for sw, st, ln in tups:
if ln in test_cases_longname:
result.append(('ON', st, ln))
else:
result.append((sw, st, ln))
# Save the updated list back to the file
save_list_to_file(result, NGSITEST_PKL)
def off_collections(args):
# Get a list with test cases for each collection to off
test_cases=[]
for collection in args.collections:
test_cases.extend(get_test_cases_by_directory(collection))
test_cases_longname = [tc.longname for tc in test_cases]
tups = load_list_from_file(NGSITEST_PKL)
result = []
for sw, st, ln in tups:
if ln in test_cases_longname:
result.append(('OFF', st, ln))
else:
result.append((sw, st, ln))
# Save the updated list back to the file
save_list_to_file(result, NGSITEST_PKL)
def update_cases(args):
try:
directory_path = './'
all_test_cases = get_test_cases_by_directory(directory_path)
all_test_cases = [test.longname for test in all_test_cases]
registered_test_cases = load_list_from_file(NGSITEST_PKL)
updated_test_cases = []
for tup in registered_test_cases:
sw, st, test = tup
if test in all_test_cases:
if sw == 'MISSING':
updated_test_cases.append(('NEW', 'PENDING', test))
else:
updated_test_cases.append(tup)
all_test_cases.remove(test)
else:
updated_test_cases.append(('MISSING', st, test))
for test in all_test_cases:
updated_test_cases.append(('NEW', 'PENDING', test))
# Save the updated list back to the file
save_list_to_file(updated_test_cases, NGSITEST_PKL)
except FileNotFoundError as e:
previous_test_cases = []
print_red(f"Error: {e}")
def clean_cases(args):
try:
test_cases = load_list_from_file(NGSITEST_PKL)
missing_test_cases = [(sw, st, test) for sw, st, test in test_cases if sw == 'MISSING']
test_cases = [(sw, st, test) for sw, st, test in test_cases if sw != 'MISSING']
for tup in missing_test_cases:
print_test_case(tup)
save_list_to_file(test_cases, NGSITEST_PKL)
except FileNotFoundError as e:
previous_test_cases = []
print_red(f"Error: {e}")
def list_cases(args):
try:
tups = load_list_from_file(NGSITEST_PKL)
args.flags = [flag.upper() for flag in args.flags]
if 'ALL' in args.flags:
for tup in tups:
print_test_case(tup)
else:
for sw, st, ln in tups:
if sw in args.flags or st in args.flags:
print_test_case((sw, st, ln))
except FileNotFoundError as e:
previous_test_cases = []
print_red(f"Error: {e}")
def run_cases(args):
def set_suite(suite, include):
suite.tests = [test for test in suite.tests if test.longname in include]
suite.suites = [set_suite(subsuite, include) for subsuite in suite.suites]
return suite
runnable_test_cases = []
#print(args.test_cases)
for tc in args.test_cases:
if tc.upper() in ['ON', 'OFF', 'NEW', 'MISSING', 'FAILED', 'PASSED', 'PENDING']:
runnable_test_cases += filter_test_cases([tc.upper()])
else:
runnable_test_cases.append(tc)
suite = get_suite('./')
suite = set_suite(suite, runnable_test_cases)
suites_with_tests = [s for s in suite.suites if s.test_count > 0]
suite.suites = suites_with_tests
result = suite.run(console='quiet', listener=[CustomConsoleListener(), ErrorListener()])
# Update test state for tests at NGSITEST_PKL
tups = load_list_from_file(NGSITEST_PKL)
def passed_tests(suite):
for test in suite.tests:
print(test.longname, test.status)
filtered_tests = [test.longname for test in suite.tests if test.passed]
for subsuite in suite.suites:
filtered_tests.extend(passed_tests(subsuite))
return filtered_tests
passed_tests = passed_tests(result.suite)
print(passed_tests)
tups = [(sw, ('PASSED' if ln in passed_tests else 'FAILED') if ln in runnable_test_cases else st, ln) for sw, st, ln in tups]
# Save the updated list back to the file
save_list_to_file(tups, NGSITEST_PKL)
def main():
parser = argparse.ArgumentParser(description='ngsitest command-line utility')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Subparser for the 'cases' command
cases_parser = subparsers.add_parser('cases', help='Test Cases Command')
cases_subparsers = cases_parser.add_subparsers(dest='command', help='Available commands')
on_cases_parser = cases_subparsers.add_parser('on', help='Switch on test cases')
on_cases_parser.add_argument('test_cases',
nargs='*', choices=['iterative', 'all'] + filter_test_cases(['OFF', 'NEW']) + [None],
help='Test cases to switch on')
on_cases_parser.set_defaults(handler=on_cases)
off_cases_parser = cases_subparsers.add_parser('off', help='Switch off test cases')
off_cases_parser.add_argument('test_cases',
nargs='*', choices=['iterative', 'all'] + filter_test_cases(['ON', 'NEW']) + [None],
help='Test cases to switch off')
off_cases_parser.set_defaults(handler=off_cases)
list_cases_parser = cases_subparsers.add_parser('list', help='List on(green/red/white), off(dark_gray), new(orange) and missing(blue) test cases')
list_cases_parser.add_argument('flags',
nargs='*', choices=['all', 'on', 'off', 'new', 'missing', 'passed', 'failed', 'pending'],
help='Test cases to switch off')
list_cases_parser.set_defaults(handler=list_cases)
run_cases_parser = cases_subparsers.add_parser('run', help='Run all ON test cases')
run_cases_parser.add_argument('test_cases', nargs='*',
choices=['on', 'off', 'new', 'passed', 'failed', 'pending'] + filter_test_cases(['ON', 'OFF', 'NEW']) + [None],
help='Test cases to run')
run_cases_parser.set_defaults(handler=run_cases)
update_cases_parser = cases_subparsers.add_parser('update', help='List state of all available test cases')
update_cases_parser.set_defaults(handler=update_cases)
clean_cases_parser = cases_subparsers.add_parser('clean', help='Remove missing (red) test cases')
clean_cases_parser.set_defaults(handler=clean_cases)
# Subparser for the 'suites' command
suites_parser = subparsers.add_parser('suites', help='Test Suites Command')
suites_subparsers = suites_parser.add_subparsers(dest='command', help='Available commands')
on_suites_parser = suites_subparsers.add_parser('on', help='Switch on test suites')
on_suites_parser.add_argument('suites',
nargs='*', choices=['iterative', 'all'] + filter_test_suites(['OFF', 'NEW']) + [None],
help='Test suites to switch on')
on_suites_parser.set_defaults(handler=on_suites)
off_suites_parser = suites_subparsers.add_parser('off', help='Switch off test suites')
off_suites_parser.add_argument('suites',
nargs='*', choices=['iterative', 'all'] + filter_test_suites(['ON', 'NEW']) + [None],
help='Test suites to switch off')
off_suites_parser.set_defaults(handler=off_suites)
# Subparser for the 'collections' command
collections_parser = subparsers.add_parser('collections', help='Test Collections Command')
collections_subparsers = collections_parser.add_subparsers(dest='command', help='Available commands')
on_collections_parser = collections_subparsers.add_parser('on', help='Switch on test collections')
on_collections_parser.add_argument('collections',
nargs='*',
help='Test collections to switch on')
on_collections_parser.set_defaults(handler=on_collections)
off_collections_parser = collections_subparsers.add_parser('off', help='Switch off test collections')
off_collections_parser.add_argument('collections',
nargs='*',
help='Test collections to switch off')
off_collections_parser.set_defaults(handler=off_collections)
# Enable argcomplete for the parser
argcomplete.autocomplete(parser)
args = parser.parse_args()
if args.command:
handler = getattr(args, 'handler', None)
if handler:
handler(args)
else:
print("No command specified.")
if __name__ == '__main__':
main()