# -*- Coding: utf-8 -*- from __future__ import division # Import floating-point division (1/4=0.25) instead of Euclidian division (1/4=0) import os import glob import sys import logging from regex_list import * MSG_PATH = "" IN_FILE_PATH = "" OUT_FILE_PATH = "" out_streams = object() logger = object() parser = object() data_ctx = dict() def process_command_line_args(): global IN_FILE_PATH print(f"Arguments count: {len(sys.argv)}") for i, arg in enumerate(sys.argv): print(f"Argument {i:>6}: {arg}") IN_FILE_PATH = sys.argv[1] # End of function process_command_line_args def process_line(line): global MSG_PATH, out_streams, logger, parser, data_ctx match = parser.parse_line(line) if match != None: logger.info(f"Matching: {match}") if 'testcase' in match: logger.info('Got testcase') out_streams.write('@startuml\n') out_streams.write(f"title {match['testcase']}\n") out_streams.write("/'Editor: https://plantuml-editor.kkeisuke.com/ '/\n") out_streams.write("/'Syntax: http://plantuml.com/fr/sequence-diagram '/\n") out_streams.write('!include etsi-style.iuml\n') out_streams.write('!pragma teoz true\n') out_streams.write('autonumber\n') out_streams.write('participant system\n') out_streams.write('box "TTCN-3 Environment" #LightBlue\n') elif 'operation' in match: logger.info('Got operation') if match['operation'] == 'mapped': out_streams.write(f"participant \"{match['comp']}:{match['protocol_port']}\"\n") data_ctx['protocol_port'].append(f"{match['comp']}:{match['protocol_port']}") else: if match['protocol_port'] != 'syncPort': out_streams.write(f"\"{match['comp']}:{match['protocol_port']}\" --> system: unmap\n") elif 'request' in match: logger.info('Got request') if data_ctx['box_ended'] == False: out_streams.write('endbox\n') data_ctx['box_ended'] = True for p in data_ctx['protocol_port']: out_streams.write(f"\"{p}\" --> system: map\n") out_streams.write(f"\"{match['comp']}:{match['port']}\" -[#green]> system: @{match['module']}.{match['message']}\n") s = match['request'].replace(",", ",\\n").replace("\"", "").strip() if len(s) > 128: n = match['hours'] + "_" + match['minutes'] + "_" + match['seconds'] + "_" + match['milliseconds'] + ".txt" n = os.path.join(MSG_PATH, n) with open(n, "w") as msg: msg.write(s) s = f'[[file://{n} request := ...]]' #{Click to read the message} out_streams.write(f"note right: {s}\n") out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n") elif 'response' in match: logger.info('Got response') if data_ctx['box_ended'] == False: out_streams.write('endbox\n') data_ctx['box_ended'] = True out_streams.write(f"system -[#green]> \"{match['comp']}:{match['port']}\": @{match['module']}.{match['message']}\n") s = match['response'].replace(",", ",\\n").replace("\"", "").strip() if len(s) > 128: n = match['hours'] + "_" + match['minutes'] + "_" + match['seconds'] + "_" + match['milliseconds'] + ".txt" n = os.path.join(MSG_PATH, n) with open(n, "w") as msg: msg.write(s) s = f'[[file://{n} response := ...]]' #{Click to read the message} out_streams.write(f"note right: {s}\n") out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n") elif 'timer_name' in match: logger.info('Got timer') if match['timer_name'] == 'tc_sync': pass else: for p in data_ctx['protocol_port']: if p.startswith(match['comp']): out_streams.write(f"\"{p}\" -> \"{p}\": {match['start_stop']}, duration: {match['duration']}\n") break out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n") elif 'verdict' in match: logger.info('Got verdict') if 'ptcid' in match and match['ptcid'] != None: s = match['ptcid'].split('(') s = s[1].split(')') for p in data_ctx['protocol_port']: if p.startswith(s[0]): if match['verdict'] == 'fail': out_streams.write(f"hnote over \"{p}\" #red: fail\n") elif match['verdict'] == 'pass': out_streams.write(f"hnote over \"{p}\" #green: pass\n") elif match['verdict'] == 'inconc': out_streams.write(f"hnote across #yellow: inconc\n") else: out_streams.write(f"hnote over \"{p}\" #gray: error\n") out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n") break elif 'final_verdict' in match: logger.info('Got verdict') if match['final_verdict'] == 'fail': out_streams.write(f"hnote across #red: fail\n") elif match['final_verdict'] == 'pass': out_streams.write(f"hnote across #green: pass\n") elif match['final_verdict'] == 'inconc': out_streams.write(f"hnote across #yellow: inconc\n") else: out_streams.write(f"hnote across #gray: error\n") out_streams.write(f"note left: {match['hours']}:{match['minutes']}:{match['seconds']}.{match['milliseconds']}\n") out_streams.write('@enduml\n') else: logger.info('Got unsupported item') # End of 'if' statement out_streams.flush() # End of function process_line def main(): global MSG_PATH, OUT_FILE_PATH, out_streams, logger, parser, data_ctx process_command_line_args() # Initialize the logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) l = logging.StreamHandler() l.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(l) # Prepare outputs OUT_FILE_PATH = sys.argv[2] CURRENT_PATH = os.getcwd() MSG_PATH = os.path.join(CURRENT_PATH, "msgs") logger.info(f'CURRENT_PATH:{CURRENT_PATH} - MSG_PATH{MSG_PATH}') if os.path.exists(MSG_PATH): files = glob.glob(os.path.join(MSG_PATH, '*')) for f in files: os.remove(f) else: os.mkdir(MSG_PATH) # Setup the regex parser = regex_list(logger) parser.reset_state() data_ctx = dict() data_ctx['protocol_port'] = [] data_ctx['box_ended'] = False print(f"IN_FILE_PATH= {IN_FILE_PATH}") with open(IN_FILE_PATH, "r") as in_stream: with open(OUT_FILE_PATH, "w") as out_streams: line = in_stream.readline() while line != '': process_line(line) line = in_stream.readline() # End of 'for' statement # End of 'with' statement # End of 'with' statement # Check if file is not empty if os.path.getsize(OUT_FILE_PATH) == 0: logger.error(f'Empty result, please check that FileMask is properly set: ERROR | WARNING | USER | MATCHING | EXECUTOR_RUNTIME | VERDICTOP | PORTEVENT | TIMEROP | TESTCASE') else: os.system('java -DPLANTUML_LIMIT_SIZE=8192 -jar ./plantuml.jar -svg {}'.format(OUT_FILE_PATH)) # End of main function if __name__ == "__main__": main() # End of file